Message Bus: How PocketPaw Routes Messages

The message bus is the backbone of PocketPaw’s architecture. All communication between channels, the agent loop, and the web dashboard flows through it.

Overview

The message bus implements a simple publish/subscribe pattern. Publishers emit events, and subscribers receive them asynchronously. This decouples components so they don’t need to know about each other.

from pocketpaw.bus.message_bus import MessageBus
from pocketpaw.bus.events import InboundMessage, OutboundMessage
bus = MessageBus()
# Subscribe to events
bus.subscribe(InboundMessage, handler_function)
# Publish events
await bus.publish(InboundMessage(
content="Hello!",
channel="telegram",
session_id="user_123",
metadata={"chat_id": 123456}
))

Event Types

InboundMessage

Represents user input from any channel:

FieldTypeDescription
contentstrThe message text
channelstrSource channel (telegram, discord, slack, etc.)
session_idstrUnique session identifier
metadatadictChannel-specific metadata (chat_id, thread_ts, etc.)

OutboundMessage

Agent responses sent back to channels:

FieldTypeDescription
contentstrResponse text
channelstrTarget channel
session_idstrSession identifier
is_stream_chunkboolWhether this is a streaming chunk
is_stream_endboolWhether this is the final chunk
metadatadictChannel-specific metadata

SystemEvent

Internal events consumed by the web dashboard:

FieldTypeDescription
event_typestrEvent type (tool_start, tool_result, thinking, error, inbox_update)
datadictEvent-specific data
session_idstrSession identifier

Streaming Protocol

PocketPaw supports real-time streaming of agent responses:

  1. The agent backend yields response chunks
  2. Each chunk is published as an OutboundMessage with is_stream_chunk=True
  3. The final message includes is_stream_end=True
  4. Channel adapters handle streaming differently per platform:
    • WebSocket — Sends each chunk immediately
    • Discord — Buffers chunks and edits messages (1.5s rate limit)
    • Slack — Buffers and updates thread messages
    • WhatsApp/Signal — Accumulates all chunks, sends on stream end
    • Telegram — Edit-in-place streaming

Tool Events

When the agent uses a tool, SystemEvent events are emitted:

SystemEvent(event_type="tool_start", data={"tool": "web_search", "input": {...}})
SystemEvent(event_type="tool_result", data={"tool": "web_search", "result": "..."})

These events power the Activity panel in the web dashboard, giving users visibility into what tools are being used.