nanobot 消息总线与渠道架构深度分析
概述
nanobot 采用异步消息总线架构实现聊天渠道与 AI 代理核心的解耦。该架构的核心设计目标是:
- 松耦合: 渠道层与代理核心通过消息总线异步通信
- 可扩展: 轻松添加新的聊天渠道(Telegram、WhatsApp、Discord 等)
- 高可靠: 异步队列防止消息丢失,自动重连机制
- 统一抽象: 所有渠道实现统一接口,消息格式标准化
系统核心文件约 500 行代码(不含具体渠道实现),展现了极简设计哲学。
消息总线设计
架构图
MessageBus 类详解
文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/queue.py:11-81
核心数据结构
class MessageBus:
def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
self._outbound_subscribers: dict[str, list[Callable[[OutboundMessage], Awaitable[None]]]] = {}
self._running = False
| 字段 | 类型 | 作用 |
|---|---|---|
inbound | asyncio.Queue[InboundMessage] | 存储来自渠道的入站消息 |
outbound | asyncio.Queue[OutboundMessage] | 存储待发送到渠道的出站消息 |
_outbound_subscribers | dict[str, list[Callable]] | 按渠道名分类的订阅者回调 |
_running | bool | 分发器运行状态标志 |
核心方法分析
发布入站消息 (queue.py:25-27)
async def publish_inbound(self, msg: InboundMessage) -> None:
"""Publish a message from a channel to the agent."""
await self.inbound.put(msg)
消费入站消息 (queue.py:29-31)
async def consume_inbound(self) -> InboundMessage:
"""Consume the next inbound message (blocks until available)."""
return await self.inbound.get()
订阅出站消息 (queue.py:41-49)
def subscribe_outbound(
self,
channel: str,
callback: Callable[[OutboundMessage], Awaitable[None]]
) -> None:
"""Subscribe to outbound messages for a specific channel."""
if channel not in self._outbound_subscribers:
self._outbound_subscribers[channel] = []
self._outbound_subscribers[channel].append(callback)
分发出站消息 (queue.py:51-67)
async def dispatch_outbound(self) -> None:
"""
Dispatch outbound messages to subscribed channels.
Run this as a background task.
"""
self._running = True
while self._running:
try:
msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
subscribers = self._outbound_subscribers.get(msg.channel, [])
for callback in subscribers:
try:
await callback(msg)
except Exception as e:
logger.error(f"Error dispatching to {msg.channel}: {e}")
except asyncio.TimeoutError:
continue
消息流转时序图
关键设计模式
- 生产者-消费者模式: 渠道作为生产者,Agent 作为消费者
- 观察者模式:
subscribe_outbound允许渠道订阅出站消息 - 异步 I/O: 使用
asyncio.Queue避免阻塞 - 超时保护:
asyncio.wait_for(timeout=1.0)防止无限阻塞
事件类型模型
InboundMessage
文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/events.py:8-23
@dataclass
class InboundMessage:
"""Message received from a chat channel."""
channel: str # telegram, discord, slack, whatsapp
sender_id: str # User identifier
chat_id: str # Chat/channel identifier
content: str # Message text
timestamp: datetime = field(default_factory=datetime.now)
media: list[str] = field(default_factory=list) # Media URLs
metadata: dict[str, Any] = field(default_factory=dict) # Channel-specific data
@property
def session_key(self) -> str:
"""Unique key for session identification."""
return f"{self.channel}:{self.chat_id}"
OutboundMessage
文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/events.py:26-35
@dataclass
class OutboundMessage:
"""Message to send to a chat channel."""
channel: str
chat_id: str
content: str
reply_to: str | None = None
media: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
字段详解
核心标识字段
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
channel | str | 渠道标识 | "telegram", "whatsapp" |
sender_id | str | 发送者标识 | "123456", "123456|username" |
chat_id | str | 会话标识 | "123456", "123456@s.whatsapp.net" |
session_key 生成逻辑
代码位置: events.py:20-23
@property
def session_key(self) -> str:
"""Unique key for session identification."""
return f"{self.channel}:{self.chat_id}"
作用:
- 在 SessionManager 中作为唯一键存储会话历史
- 支持多渠道独立会话管理
- 格式:
"{channel}:{chat_id}",如"telegram:123456"
metadata 字段使用场景
Telegram 元数据 (telegram.py:282-288)
metadata={
"message_id": message.message_id,
"user_id": user.id,
"username": user.username,
"first_name": user.first_name,
"is_group": message.chat.type != "private"
}
WhatsApp 元数据 (whatsapp.py:119-123)
metadata={
"message_id": data.get("id"),
"timestamp": data.get("timestamp"),
"is_group": data.get("isGroup", False)
}
media 字段处理
Telegram 媒体下载 (telegram.py:238-270)
# Download media if present
if media_file and self._app:
try:
file = await self._app.bot.get_file(media_file.file_id)
ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None))
# Save to workspace/media/
from pathlib import Path
media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / f"{media_file.file_id[:16]}{ext}"
await file.download_to_drive(str(file_path))
media_paths.append(str(file_path))
支持的媒体类型:
- 图片 (image):
.jpg,.png,.gif - 语音 (voice):
.ogg - 音频 (audio):
.mp3,.m4a - 文档 (file): 原始扩展名
渠道管理器
组件架构图
ChannelManager 类详解
文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/manager.py:14-139
初始化流程
代码位置: manager.py:24-30
def __init__(self, config: Config, bus: MessageBus):
self.config = config
self.bus = bus
self.channels: dict[str, BaseChannel] = {}
self._dispatch_task: asyncio.Task | None = None
self._init_channels()
渠道初始化
代码位置: manager.py:32-57
def _init_channels(self) -> None:
"""Initialize channels based on config."""
# Telegram channel
if self.config.channels.telegram.enabled:
try:
from nanobot.channels.telegram import TelegramChannel
self.channels["telegram"] = TelegramChannel(
self.config.channels.telegram,
self.bus,
groq_api_key=self.config.providers.groq.api_key,
)
logger.info("Telegram channel enabled")
except ImportError as e:
logger.warning(f"Telegram channel not available: {e}")
# WhatsApp channel
if self.config.channels.whatsapp.enabled:
try:
from nanobot.channels.whatsapp import WhatsAppChannel
self.channels["whatsapp"] = WhatsAppChannel(
self.config.channels.whatsapp, self.bus
)
logger.info("WhatsApp channel enabled")
except ImportError as e:
logger.warning(f"WhatsApp channel not available: {e}")
设计特点:
- 延迟导入: 只在启用时导入渠道模块
- 优雅降级: 导入失败时记录警告但不中断系统
- 统一配置: 从 Config 对象读取渠道配置
启动流程
代码位置: manager.py:59-75
async def start_all(self) -> None:
"""Start WhatsApp channel and the outbound dispatcher."""
if not self.channels:
logger.warning("No channels enabled")
return
# Start outbound dispatcher
self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
# Start WhatsApp channel
tasks = []
for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...")
tasks.append(asyncio.create_task(channel.start()))
# Wait for all to complete (they should run forever)
await asyncio.gather(*tasks, return_exceptions=True)
时序说明:
- 创建分发器任务
- 并行启动所有渠道
asyncio.gather等待所有任务完成(实际是无限运行)
出站消息分发
代码位置: manager.py:97-120
async def _dispatch_outbound(self) -> None:
"""Dispatch outbound messages to the appropriate channel."""
logger.info("Outbound dispatcher started")
while True:
try:
msg = await asyncio.wait_for(
self.bus.consume_outbound(),
timeout=1.0
)
channel = self.channels.get(msg.channel)
if channel:
try:
await channel.send(msg)
except Exception as e:
logger.error(f"Error sending to {msg.channel}: {e}")
else:
logger.warning(f"Unknown channel: {msg.channel}")
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
路由逻辑:
- 从消息总线消费出站消息
- 根据
msg.channel查找对应渠道实例 - 调用渠道的
send()方法发送消息 - 异常处理和未知渠道警告
停止流程
代码位置: manager.py:77-95
async def stop_all(self) -> None:
"""Stop all channels and the dispatcher."""
logger.info("Stopping all channels...")
# Stop dispatcher
if self._dispatch_task:
self._dispatch_task.cancel()
try:
await self._dispatch_task
except asyncio.CancelledError:
pass
# Stop all channels
for name, channel in self.channels.items():
try:
await channel.stop()
logger.info(f"Stopped {name} channel")
except Exception as e:
logger.error(f"Error stopping {name}: {e}")
渠道抽象与实现
类图
BaseChannel 接口设计
文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/base.py:10-121
抽象方法
@abstractmethod
async def start(self) -> None:
"""
Start the channel and begin listening for messages.
This should be a long-running async task that:
1. Connects to the chat platform
2. Listens for incoming messages
3. Forwards messages to the bus via _handle_message()
"""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the channel and clean up resources."""
pass
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""
Send a message through this channel.
Args:
msg: The message to send.
"""
pass
权限检查
代码位置: base.py:59-82
def is_allowed(self, sender_id: str) -> bool:
"""
Check if a sender is allowed to use this bot.
Args:
sender_id: The sender's identifier.
Returns:
True if allowed, False otherwise.
"""
allow_list = getattr(self.config, "allow_from", [])
# If no allow list, allow everyone
if not allow_list:
return True
sender_str = str(sender_id)
if sender_str in allow_list:
return True
if "|" in sender_str:
for part in sender_str.split("|"):
if part and part in allow_list:
return True
return False
支持格式:
- 纯 ID:
"123456" - 组合格式:
"123456|username"(Telegram 使用)
统一消息处理
代码位置: base.py:84-116
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None
) -> None:
"""
Handle an incoming message from the chat platform.
This method checks permissions and forwards to the bus.
"""
if not self.is_allowed(sender_id):
return
msg = InboundMessage(
channel=self.name,
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
metadata=metadata or {}
)
await self.bus.publish_inbound(msg)
Telegram 渠道实现
文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py:79-302
核心特性
- 长轮询模式: 无需公网 IP 和 webhook 配置
- Markdown 转 HTML: 自定义格式转换函数
- 多类型消息支持: 文本、图片、语音、音频、文档
- 语音转录: 集成 GroqTranscriptionProvider
Markdown 转换逻辑
代码位置: telegram.py:16-76
def _markdown_to_telegram_html(text: str) -> str:
"""
Convert markdown to Telegram-safe HTML.
"""
if not text:
return ""
# 1. Extract and protect code blocks (preserve content from other processing)
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
code_blocks.append(m.group(1))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
# 2. Extract and protect inline code
inline_codes: list[str] = []
def save_inline_code(m: re.Match) -> str:
inline_codes.append(m.group(1))
return f"\x00IC{len(inline_codes) - 1}\x00"
text = re.sub(r'`([^`]+)`', save_inline_code, text)
# 3. Headers # Title -> just the title text
text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE)
# 4. Blockquotes > text -> just the text (before HTML escaping)
text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE)
# 5. Escape HTML special characters
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
# 6. Links [text](url) - must be before bold/italic to handle nested cases
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', text)
# 7. Bold **text** or __text__
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
# 8. Italic _text_ (avoid matching inside words like some_var_name)
text = re.sub(r'(?<![a-zA-Z0-9])_([^_]+)_(?![a-zA-Z0-9])', r'<i>\1</i>', text)
# 9. Strikethrough ~~text~~
text = re.sub(r'~~(.+?)~~', r'<s>\1</s>', text)
# 10. Bullet lists - item -> • item
text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE)
# 11. Restore inline code with HTML tags
for i, code in enumerate(inline_codes):
# Escape HTML in code content
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00IC{i}\x00", f"<code>{escaped}</code>")
# 12. Restore code blocks with HTML tags
for i, code in enumerate(code_blocks):
# Escape HTML in code content
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00CB{i}\x00", f"<pre><code>{escaped}</code></pre>")
return text
支持的 Markdown 语法:
- 代码块:
language ... - 行内代码:
code - 标题: #, ##, ### 等
- 链接: text
- 粗体: text 或 text
- 斜体: text
- 删除线:
text - 列表: - item
语音转录
代码位置: telegram.py:254-263
# Handle voice transcription
if media_type == "voice" or media_type == "audio":
from nanobot.providers.transcription import GroqTranscriptionProvider
transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
transcription = await transcriber.transcribe(file_path)
if transcription:
logger.info(f"Transcribed {media_type}: {transcription[:50]}...")
content_parts.append(f"[transcription: {transcription}]")
else:
content_parts.append(f"[{media_type}: {file_path}]")
GroqTranscriptionProvider 实现: /home/sujie/dev/github/nanobot/nanobot/providers/transcription.py:11-65
class GroqTranscriptionProvider:
"""
Voice transcription provider using Groq's Whisper API.
Groq offers extremely fast transcription with a generous free tier.
"""
def __init__(self, api_key: str | None = None):
self.api_key = api_key or os.environ.get("GROQ_API_KEY")
self.api_url = "https://api.groq.com/openai/v1/audio/transcriptions"
async def transcribe(self, file_path: str | Path) -> str:
"""
Transcribe an audio file using Groq.
"""
if not self.api_key:
logger.warning("Groq API key not configured for transcription")
return ""
path = Path(file_path)
if not path.exists():
logger.error(f"Audio file not found: {file_path}")
return ""
try:
async with httpx.AsyncClient() as client:
with open(path, "rb") as f:
files = {
"file": (path.name, f),
"model": (None, "whisper-large-v3"),
}
headers = {
"Authorization": f"Bearer {self.api_key}",
}
response = await client.post(
self.api_url,
headers=headers,
files=files,
timeout=60.0
)
response.raise_for_status()
data = response.json()
return data.get("text", "")
except Exception as e:
logger.error(f"Groq transcription error: {e}")
return ""
WhatsApp 渠道实现
文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py:15-141
架构特点
- Node.js 桥接: 通过 WebSocket 连接到 Node.js 桥接服务
- WhatsApp Web 协议: 使用 @whiskeysockets/baileys 库
- 自动重连: 连接断开后自动重新连接
- QR 码认证: 通过桥接终端扫描 QR 码
连接管理
代码位置: whatsapp.py:31-64
async def start(self) -> None:
"""Start the WhatsApp channel by connecting to the bridge."""
import websockets
bridge_url = self.config.bridge_url
logger.info(f"Connecting to WhatsApp bridge at {bridge_url}...")
self._running = True
while self._running:
try:
async with websockets.connect(bridge_url) as ws:
self._ws = ws
self._connected = True
logger.info("Connected to WhatsApp bridge")
# Listen for messages
async for message in ws:
try:
await self._handle_bridge_message(message)
except Exception as e:
logger.error(f"Error handling bridge message: {e}")
except asyncio.CancelledError:
break
except Exception as e:
self._connected = False
self._ws = None
logger.warning(f"WhatsApp bridge connection error: {e}")
if self._running:
logger.info("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
桥接消息处理
代码位置: whatsapp.py:91-141
async def _handle_bridge_message(self, raw: str) -> None:
"""Handle a message from the bridge."""
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from bridge: {raw[:100]}")
return
msg_type = data.get("type")
if msg_type == "message":
# Incoming message from WhatsApp
sender = data.get("sender", "")
content = data.get("content", "")
# sender is typically: <phone>@s.whatsapp.net
# Extract just the phone number as chat_id
chat_id = sender.split("@")[0] if "@" in sender else sender
# Handle voice transcription if it's a voice message
if content == "[Voice Message]":
logger.info(f"Voice message received from {chat_id}, but direct download from bridge is not yet supported.")
content = "[Voice Message: Transcription not available for WhatsApp yet]"
await self._handle_message(
sender_id=chat_id,
chat_id=sender, # Use full JID for replies
content=content,
metadata={
"message_id": data.get("id"),
"timestamp": data.get("timestamp"),
"is_group": data.get("isGroup", False)
}
)
elif msg_type == "status":
# Connection status update
status = data.get("status")
logger.info(f"WhatsApp status: {status}")
if status == "connected":
self._connected = True
elif status == "disconnected":
self._connected = False
elif msg_type == "qr":
# QR code for authentication
logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
elif msg_type == "error":
logger.error(f"WhatsApp bridge error: {data.get('error')}")
消息路由机制
系统消息路由
文件位置: /home/sujie/dev/github/nanobot/nanobot/agent/loop.py:218-308
路由设计
系统消息 (channel="system") 用于子代理结果回传等内部通信:
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info(f"Processing system message from {msg.sender_id}")
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
else:
# Fallback
origin_channel = "cli"
origin_chat_id = msg.chat_id
# Use the origin session for context
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
# ... 处理逻辑 ...
return OutboundMessage(
channel=origin_channel,
chat_id=origin_chat_id,
content=final_content
)
完整路由流程图
子代理结果回传机制
场景: 主代理调用 spawn 工具创建子代理,子代理处理完成后需要将结果回传给原始渠道。
流程:
- 子代理将结果作为系统消息发送到消息总线
chat_id设置为"original_channel:original_chat_id"- AgentLoop 识别为系统消息
- 解析原始渠道和会话 ID
- 将响应路由回原始渠道
CLI Gateway 模式
文件位置: /home/sujie/dev/github/nanobot/nanobot/cli/commands.py:155-267
启动流程
@app.command()
def gateway(
port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
):
"""Start the nanobot gateway."""
# ... 配置加载 ...
# Create components
bus = MessageBus()
# Create provider
provider = LiteLLMProvider(
api_key=api_key,
api_base=api_base,
default_model=config.agents.defaults.model
)
# Create agent
agent = AgentLoop(
bus=bus,
provider=provider,
workspace=config.workspace_path,
model=config.agents.defaults.model,
max_iterations=config.agents.defaults.max_tool_iterations,
brave_api_key=config.tools.web.search.api_key or None
)
# Create cron service
# Create heartbeat service
# Create channel manager
channels = ChannelManager(config, bus)
async def run():
try:
await cron.start()
await heartbeat.start()
await asyncio.gather(
agent.run(),
channels.start_all(),
)
except KeyboardInterrupt:
console.print("\nShutting down...")
heartbeat.stop()
cron.stop()
agent.stop()
await channels.stop_all()
asyncio.run(run())
Gateway 组件关系图
深挖价值点
1. 异步队列性能优化
位置: /home/sujie/dev/github/nanobot/nanobot/bus/queue.py:59
msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
价值:
- 使用 1 秒超时避免永久阻塞
- 允许定期检查
_running状态 - 优雅的停止机制
深挖方向:
- 背压控制: 当队列过大时如何降级处理
- 优先级队列: 为紧急消息提供更高优先级
- 批量处理: 合并多个消息以提高吞吐量
2. Markdown 转 HTML 的保护机制
位置: /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py:23-37
# Extract and protect code blocks
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
code_blocks.append(m.group(1))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
价值:
- 使用空字符 (
\x00) 保护代码块内容 - 防止其他格式转换破坏代码
- 最后统一恢复并转义 HTML
深挖方向:
- 支持更多 Markdown 语法(表格、任务列表等)
- 自定义渲染器的可插拔设计
- 性能优化(预编译正则表达式)
3. 系统消息路由的巧妙设计
位置: /home/sujie/dev/github/nanobot/nanobot/agent/loop.py:228-231
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
价值:
- 复用
chat_id字段存储路由信息 - 无需新增字段保持消息结构简单
- 支持跨渠道的子代理通信
深挖方向:
- 更复杂的路由规则(条件路由、广播)
- 消息追踪和调试功能
- 路由失败的回退机制
4. WhatsApp 桥接架构
位置: /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py:31-64
价值:
- Python 与 Node.js 的混合架构
- 利用成熟的 Node.js WhatsApp 库
- WebSocket 实时双向通信
深挖方向:
- 桥接服务的独立部署和管理
- 多 WhatsApp 账号支持
- 消息持久化和重放机制
5. 权限检查的灵活设计
位置: /home/sujie/dev/github/nanobot/nanobot/channels/base.py:76-82
if "|" in sender_str:
for part in sender_str.split("|"):
if part and part in allow_list:
return True
价值:
- 支持多种 ID 格式
- 兼容用户名和数字 ID
- 优雅降级(无白名单则全部允许)
深挖方向:
- 更细粒度的权限控制(按功能、按时间)
- 动态权限更新(无需重启)
- 权限审计和日志
关键代码位置索引
消息总线核心
| 组件 | 文件路径 | 行号 |
|---|---|---|
InboundMessage | /home/sujie/dev/github/nanobot/nanobot/bus/events.py | 8-23 |
OutboundMessage | /home/sujie/dev/github/nanobot/nanobot/bus/events.py | 26-35 |
MessageBus.__init__ | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 19-23 |
MessageBus.publish_inbound | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 25-27 |
MessageBus.consume_inbound | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 29-31 |
MessageBus.publish_outbound | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 33-35 |
MessageBus.subscribe_outbound | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 41-49 |
MessageBus.dispatch_outbound | /home/sujie/dev/github/nanobot/nanobot/bus/queue.py | 51-67 |
渠道管理器
| 组件 | 文件路径 | 行号 |
|---|---|---|
ChannelManager.__init__ | /home/sujie/dev/github/nanobot/nanobot/channels/manager.py | 24-30 |
ChannelManager._init_channels | /home/sujie/dev/github/nanobot/nanobot/channels/manager.py | 32-57 |
ChannelManager.start_all | /home/sujie/dev/github/nanobot/nanobot/channels/manager.py | 59-75 |
ChannelManager._dispatch_outbound | /home/sujie/dev/github/nanobot/nanobot/channels/manager.py | 97-120 |
ChannelManager.stop_all | /home/sujie/dev/github/nanobot/nanobot/channels/manager.py | 77-95 |
渠道基类
| 组件 | 文件路径 | 行号 |
|---|---|---|
BaseChannel 类 | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 10-121 |
BaseChannel.start | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 32-42 |
BaseChannel.stop | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 44-47 |
BaseChannel.send | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 49-57 |
BaseChannel.is_allowed | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 59-82 |
BaseChannel._handle_message | /home/sujie/dev/github/nanobot/nanobot/channels/base.py | 84-116 |
Telegram 渠道
| 组件 | 文件路径 | 行号 |
|---|---|---|
TelegramChannel 类 | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 79-302 |
_markdown_to_telegram_html | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 16-76 |
TelegramChannel.start | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 95-141 |
TelegramChannel.send | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 154-181 |
TelegramChannel._on_message | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 194-289 |
| 媒体下载逻辑 | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 238-270 |
| 语音转录集成 | /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py | 254-263 |
WhatsApp 渠道
| 组件 | 文件路径 | 行号 |
|---|---|---|
WhatsAppChannel 类 | /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py | 15-141 |
WhatsAppChannel.start | /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py | 31-64 |
WhatsAppChannel.send | /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py | 75-89 |
WhatsAppChannel._handle_bridge_message | /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py | 91-141 |
Agent 循环
| 组件 | 文件路径 | 行号 |
|---|---|---|
AgentLoop.run | /home/sujie/dev/github/nanobot/nanobot/agent/loop.py | 89-120 |
AgentLoop._process_message | /home/sujie/dev/github/nanobot/nanobot/agent/loop.py | 123-216 |
AgentLoop._process_system_message | /home/sujie/dev/github/nanobot/nanobot/agent/loop.py | 218-308 |
| 系统消息路由解析 | /home/sujie/dev/github/nanobot/nanobot/agent/loop.py | 228-231 |
CLI Gateway
| 组件 | 文件路径 | 行号 |
|---|---|---|
gateway 命令 | /home/sujie/dev/github/nanobot/nanobot/cli/commands.py | 155-267 |
| 组件初始化 | /home/sujie/dev/github/nanobot/nanobot/cli/commands.py | 176-239 |
| 运行循环 | /home/sujie/dev/github/nanobot/nanobot/cli/commands.py | 252-266 |
总结
nanobot 的消息总线与渠道架构展现了以下设计精髓:
- 极简主义: 核心代码仅 500 行左右,功能完整
- 异步优先: 全面采用 async/await,高效利用系统资源
- 接口统一:
BaseChannel抽象使得新渠道集成非常简单 - 松耦合: 消息总线作为中间层解耦渠道与代理
- 可扩展: 支持多渠道并行运行和系统消息路由
这种架构设计使得 nanobot 能够作为轻量级个人 AI 助手框架,同时保持足够的扩展性来支持复杂的聊天应用场景。