Message Bus: How PocketPaw Routes Messages
The message bus is the backbone of PocketPaw’s architecture. All communication between channels, the agent loop, and the web dashboard flows through it.
Overview
The message bus implements a simple publish/subscribe pattern. Publishers emit events, and subscribers receive them asynchronously. This decouples components so they don’t need to know about each other.
from pocketpaw.bus.message_bus import MessageBusfrom pocketpaw.bus.events import InboundMessage, OutboundMessage
bus = MessageBus()
# Subscribe to eventsbus.subscribe(InboundMessage, handler_function)
# Publish eventsawait bus.publish(InboundMessage( content="Hello!", channel="telegram", session_id="user_123", metadata={"chat_id": 123456}))Event Types
InboundMessage
Represents user input from any channel:
| Field | Type | Description |
|---|---|---|
content | str | The message text |
channel | str | Source channel (telegram, discord, slack, etc.) |
session_id | str | Unique session identifier |
metadata | dict | Channel-specific metadata (chat_id, thread_ts, etc.) |
OutboundMessage
Agent responses sent back to channels:
| Field | Type | Description |
|---|---|---|
content | str | Response text |
channel | str | Target channel |
session_id | str | Session identifier |
is_stream_chunk | bool | Whether this is a streaming chunk |
is_stream_end | bool | Whether this is the final chunk |
metadata | dict | Channel-specific metadata |
SystemEvent
Internal events consumed by the web dashboard:
| Field | Type | Description |
|---|---|---|
event_type | str | Event type (tool_start, tool_result, thinking, error, inbox_update) |
data | dict | Event-specific data |
session_id | str | Session identifier |
Streaming Protocol
PocketPaw supports real-time streaming of agent responses:
- The agent backend yields response chunks
- Each chunk is published as an
OutboundMessagewithis_stream_chunk=True - The final message includes
is_stream_end=True - Channel adapters handle streaming differently per platform:
- WebSocket — Sends each chunk immediately
- Discord — Buffers chunks and edits messages (1.5s rate limit)
- Slack — Buffers and updates thread messages
- WhatsApp/Signal — Accumulates all chunks, sends on stream end
- Telegram — Edit-in-place streaming
Tool Events
When the agent uses a tool, SystemEvent events are emitted:
SystemEvent(event_type="tool_start", data={"tool": "web_search", "input": {...}})SystemEvent(event_type="tool_result", data={"tool": "web_search", "result": "..."})These events power the Activity panel in the web dashboard, giving users visibility into what tools are being used.