Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • nanobot 消息总线与渠道架构深度分析

nanobot 消息总线与渠道架构深度分析

概述

nanobot 采用异步消息总线架构实现聊天渠道与 AI 代理核心的解耦。该架构的核心设计目标是:

  1. 松耦合: 渠道层与代理核心通过消息总线异步通信
  2. 可扩展: 轻松添加新的聊天渠道(Telegram、WhatsApp、Discord 等)
  3. 高可靠: 异步队列防止消息丢失,自动重连机制
  4. 统一抽象: 所有渠道实现统一接口,消息格式标准化

系统核心文件约 500 行代码(不含具体渠道实现),展现了极简设计哲学。


消息总线设计

架构图

MessageBus 类详解

文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/queue.py:11-81

核心数据结构

class MessageBus:
    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
        self._outbound_subscribers: dict[str, list[Callable[[OutboundMessage], Awaitable[None]]]] = {}
        self._running = False
字段类型作用
inboundasyncio.Queue[InboundMessage]存储来自渠道的入站消息
outboundasyncio.Queue[OutboundMessage]存储待发送到渠道的出站消息
_outbound_subscribersdict[str, list[Callable]]按渠道名分类的订阅者回调
_runningbool分发器运行状态标志

核心方法分析

发布入站消息 (queue.py:25-27)

async def publish_inbound(self, msg: InboundMessage) -> None:
    """Publish a message from a channel to the agent."""
    await self.inbound.put(msg)

消费入站消息 (queue.py:29-31)

async def consume_inbound(self) -> InboundMessage:
    """Consume the next inbound message (blocks until available)."""
    return await self.inbound.get()

订阅出站消息 (queue.py:41-49)

def subscribe_outbound(
    self, 
    channel: str, 
    callback: Callable[[OutboundMessage], Awaitable[None]]
) -> None:
    """Subscribe to outbound messages for a specific channel."""
    if channel not in self._outbound_subscribers:
        self._outbound_subscribers[channel] = []
    self._outbound_subscribers[channel].append(callback)

分发出站消息 (queue.py:51-67)

async def dispatch_outbound(self) -> None:
    """
    Dispatch outbound messages to subscribed channels.
    Run this as a background task.
    """
    self._running = True
    while self._running:
        try:
            msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
            subscribers = self._outbound_subscribers.get(msg.channel, [])
            for callback in subscribers:
                try:
                    await callback(msg)
                except Exception as e:
                    logger.error(f"Error dispatching to {msg.channel}: {e}")
        except asyncio.TimeoutError:
            continue

消息流转时序图

关键设计模式

  1. 生产者-消费者模式: 渠道作为生产者,Agent 作为消费者
  2. 观察者模式: subscribe_outbound 允许渠道订阅出站消息
  3. 异步 I/O: 使用 asyncio.Queue 避免阻塞
  4. 超时保护: asyncio.wait_for(timeout=1.0) 防止无限阻塞

事件类型模型

InboundMessage

文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/events.py:8-23

@dataclass
class InboundMessage:
    """Message received from a chat channel."""
    
    channel: str  # telegram, discord, slack, whatsapp
    sender_id: str  # User identifier
    chat_id: str  # Chat/channel identifier
    content: str  # Message text
    timestamp: datetime = field(default_factory=datetime.now)
    media: list[str] = field(default_factory=list)  # Media URLs
    metadata: dict[str, Any] = field(default_factory=dict)  # Channel-specific data
    
    @property
    def session_key(self) -> str:
        """Unique key for session identification."""
        return f"{self.channel}:{self.chat_id}"

OutboundMessage

文件位置: /home/sujie/dev/github/nanobot/nanobot/bus/events.py:26-35

@dataclass
class OutboundMessage:
    """Message to send to a chat channel."""
    
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None
    media: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

字段详解

核心标识字段

字段类型说明示例
channelstr渠道标识"telegram", "whatsapp"
sender_idstr发送者标识"123456", "123456|username"
chat_idstr会话标识"123456", "123456@s.whatsapp.net"

session_key 生成逻辑

代码位置: events.py:20-23

@property
def session_key(self) -> str:
    """Unique key for session identification."""
    return f"{self.channel}:{self.chat_id}"

作用:

  • 在 SessionManager 中作为唯一键存储会话历史
  • 支持多渠道独立会话管理
  • 格式: "{channel}:{chat_id}",如 "telegram:123456"

metadata 字段使用场景

Telegram 元数据 (telegram.py:282-288)

metadata={
    "message_id": message.message_id,
    "user_id": user.id,
    "username": user.username,
    "first_name": user.first_name,
    "is_group": message.chat.type != "private"
}

WhatsApp 元数据 (whatsapp.py:119-123)

metadata={
    "message_id": data.get("id"),
    "timestamp": data.get("timestamp"),
    "is_group": data.get("isGroup", False)
}

media 字段处理

Telegram 媒体下载 (telegram.py:238-270)

# Download media if present
if media_file and self._app:
    try:
        file = await self._app.bot.get_file(media_file.file_id)
        ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None))
        
        # Save to workspace/media/
        from pathlib import Path
        media_dir = Path.home() / ".nanobot" / "media"
        media_dir.mkdir(parents=True, exist_ok=True)
        
        file_path = media_dir / f"{media_file.file_id[:16]}{ext}"
        await file.download_to_drive(str(file_path))
        
        media_paths.append(str(file_path))

支持的媒体类型:

  • 图片 (image): .jpg, .png, .gif
  • 语音 (voice): .ogg
  • 音频 (audio): .mp3, .m4a
  • 文档 (file): 原始扩展名

渠道管理器

组件架构图

ChannelManager 类详解

文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/manager.py:14-139

初始化流程

代码位置: manager.py:24-30

def __init__(self, config: Config, bus: MessageBus):
    self.config = config
    self.bus = bus
    self.channels: dict[str, BaseChannel] = {}
    self._dispatch_task: asyncio.Task | None = None
    
    self._init_channels()

渠道初始化

代码位置: manager.py:32-57

def _init_channels(self) -> None:
    """Initialize channels based on config."""
    
    # Telegram channel
    if self.config.channels.telegram.enabled:
        try:
            from nanobot.channels.telegram import TelegramChannel
            self.channels["telegram"] = TelegramChannel(
                self.config.channels.telegram,
                self.bus,
                groq_api_key=self.config.providers.groq.api_key,
            )
            logger.info("Telegram channel enabled")
        except ImportError as e:
            logger.warning(f"Telegram channel not available: {e}")
    
    # WhatsApp channel
    if self.config.channels.whatsapp.enabled:
        try:
            from nanobot.channels.whatsapp import WhatsAppChannel
            self.channels["whatsapp"] = WhatsAppChannel(
                self.config.channels.whatsapp, self.bus
            )
            logger.info("WhatsApp channel enabled")
        except ImportError as e:
            logger.warning(f"WhatsApp channel not available: {e}")

设计特点:

  • 延迟导入: 只在启用时导入渠道模块
  • 优雅降级: 导入失败时记录警告但不中断系统
  • 统一配置: 从 Config 对象读取渠道配置

启动流程

代码位置: manager.py:59-75

async def start_all(self) -> None:
    """Start WhatsApp channel and the outbound dispatcher."""
    if not self.channels:
        logger.warning("No channels enabled")
        return
    
    # Start outbound dispatcher
    self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
    
    # Start WhatsApp channel
    tasks = []
    for name, channel in self.channels.items():
        logger.info(f"Starting {name} channel...")
        tasks.append(asyncio.create_task(channel.start()))
    
    # Wait for all to complete (they should run forever)
    await asyncio.gather(*tasks, return_exceptions=True)

时序说明:

  1. 创建分发器任务
  2. 并行启动所有渠道
  3. asyncio.gather 等待所有任务完成(实际是无限运行)

出站消息分发

代码位置: manager.py:97-120

async def _dispatch_outbound(self) -> None:
    """Dispatch outbound messages to the appropriate channel."""
    logger.info("Outbound dispatcher started")
    
    while True:
        try:
            msg = await asyncio.wait_for(
                self.bus.consume_outbound(),
                timeout=1.0
            )
            
            channel = self.channels.get(msg.channel)
            if channel:
                try:
                    await channel.send(msg)
                except Exception as e:
                    logger.error(f"Error sending to {msg.channel}: {e}")
            else:
                logger.warning(f"Unknown channel: {msg.channel}")
                
        except asyncio.TimeoutError:
            continue
        except asyncio.CancelledError:
            break

路由逻辑:

  1. 从消息总线消费出站消息
  2. 根据 msg.channel 查找对应渠道实例
  3. 调用渠道的 send() 方法发送消息
  4. 异常处理和未知渠道警告

停止流程

代码位置: manager.py:77-95

async def stop_all(self) -> None:
    """Stop all channels and the dispatcher."""
    logger.info("Stopping all channels...")
    
    # Stop dispatcher
    if self._dispatch_task:
        self._dispatch_task.cancel()
        try:
            await self._dispatch_task
        except asyncio.CancelledError:
            pass
    
    # Stop all channels
    for name, channel in self.channels.items():
        try:
            await channel.stop()
            logger.info(f"Stopped {name} channel")
        except Exception as e:
            logger.error(f"Error stopping {name}: {e}")

渠道抽象与实现

类图

BaseChannel 接口设计

文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/base.py:10-121

抽象方法

@abstractmethod
async def start(self) -> None:
    """
    Start the channel and begin listening for messages.
    
    This should be a long-running async task that:
    1. Connects to the chat platform
    2. Listens for incoming messages
    3. Forwards messages to the bus via _handle_message()
    """
    pass

@abstractmethod
async def stop(self) -> None:
    """Stop the channel and clean up resources."""
    pass

@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
    """
    Send a message through this channel.
    
    Args:
        msg: The message to send.
    """
    pass

权限检查

代码位置: base.py:59-82

def is_allowed(self, sender_id: str) -> bool:
    """
    Check if a sender is allowed to use this bot.
    
    Args:
        sender_id: The sender's identifier.
    
    Returns:
        True if allowed, False otherwise.
    """
    allow_list = getattr(self.config, "allow_from", [])
    
    # If no allow list, allow everyone
    if not allow_list:
        return True
    
    sender_str = str(sender_id)
    if sender_str in allow_list:
        return True
    if "|" in sender_str:
        for part in sender_str.split("|"):
            if part and part in allow_list:
                return True
    return False

支持格式:

  • 纯 ID: "123456"
  • 组合格式: "123456|username" (Telegram 使用)

统一消息处理

代码位置: base.py:84-116

async def _handle_message(
    self,
    sender_id: str,
    chat_id: str,
    content: str,
    media: list[str] | None = None,
    metadata: dict[str, Any] | None = None
) -> None:
    """
    Handle an incoming message from the chat platform.
    
    This method checks permissions and forwards to the bus.
    """
    if not self.is_allowed(sender_id):
        return
    
    msg = InboundMessage(
        channel=self.name,
        sender_id=str(sender_id),
        chat_id=str(chat_id),
        content=content,
        media=media or [],
        metadata=metadata or {}
    )
    
    await self.bus.publish_inbound(msg)

Telegram 渠道实现

文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py:79-302

核心特性

  1. 长轮询模式: 无需公网 IP 和 webhook 配置
  2. Markdown 转 HTML: 自定义格式转换函数
  3. 多类型消息支持: 文本、图片、语音、音频、文档
  4. 语音转录: 集成 GroqTranscriptionProvider

Markdown 转换逻辑

代码位置: telegram.py:16-76

def _markdown_to_telegram_html(text: str) -> str:
    """
    Convert markdown to Telegram-safe HTML.
    """
    if not text:
        return ""
    
    # 1. Extract and protect code blocks (preserve content from other processing)
    code_blocks: list[str] = []
    def save_code_block(m: re.Match) -> str:
        code_blocks.append(m.group(1))
        return f"\x00CB{len(code_blocks) - 1}\x00"
    
    text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
    
    # 2. Extract and protect inline code
    inline_codes: list[str] = []
    def save_inline_code(m: re.Match) -> str:
        inline_codes.append(m.group(1))
        return f"\x00IC{len(inline_codes) - 1}\x00"
    
    text = re.sub(r'`([^`]+)`', save_inline_code, text)
    
    # 3. Headers # Title -> just the title text
    text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE)
    
    # 4. Blockquotes > text -> just the text (before HTML escaping)
    text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE)
    
    # 5. Escape HTML special characters
    text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
    
    # 6. Links [text](url) - must be before bold/italic to handle nested cases
    text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', text)
    
    # 7. Bold **text** or __text__
    text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
    text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
    
    # 8. Italic _text_ (avoid matching inside words like some_var_name)
    text = re.sub(r'(?<![a-zA-Z0-9])_([^_]+)_(?![a-zA-Z0-9])', r'<i>\1</i>', text)
    
    # 9. Strikethrough ~~text~~
    text = re.sub(r'~~(.+?)~~', r'<s>\1</s>', text)
    
    # 10. Bullet lists - item -> • item
    text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE)
    
    # 11. Restore inline code with HTML tags
    for i, code in enumerate(inline_codes):
        # Escape HTML in code content
        escaped = code.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
        text = text.replace(f"\x00IC{i}\x00", f"<code>{escaped}</code>")
    
    # 12. Restore code blocks with HTML tags
    for i, code in enumerate(code_blocks):
        # Escape HTML in code content
        escaped = code.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
        text = text.replace(f"\x00CB{i}\x00", f"<pre><code>{escaped}</code></pre>")
    
    return text

支持的 Markdown 语法:

  • 代码块: language ...
  • 行内代码: code
  • 标题: #, ##, ### 等
  • 链接: text
  • 粗体: text 或 text
  • 斜体: text
  • 删除线: text
  • 列表: - item

语音转录

代码位置: telegram.py:254-263

# Handle voice transcription
if media_type == "voice" or media_type == "audio":
    from nanobot.providers.transcription import GroqTranscriptionProvider
    transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
    transcription = await transcriber.transcribe(file_path)
    if transcription:
        logger.info(f"Transcribed {media_type}: {transcription[:50]}...")
        content_parts.append(f"[transcription: {transcription}]")
    else:
        content_parts.append(f"[{media_type}: {file_path}]")

GroqTranscriptionProvider 实现: /home/sujie/dev/github/nanobot/nanobot/providers/transcription.py:11-65

class GroqTranscriptionProvider:
    """
    Voice transcription provider using Groq's Whisper API.
    
    Groq offers extremely fast transcription with a generous free tier.
    """
    
    def __init__(self, api_key: str | None = None):
        self.api_key = api_key or os.environ.get("GROQ_API_KEY")
        self.api_url = "https://api.groq.com/openai/v1/audio/transcriptions"
    
    async def transcribe(self, file_path: str | Path) -> str:
        """
        Transcribe an audio file using Groq.
        """
        if not self.api_key:
            logger.warning("Groq API key not configured for transcription")
            return ""
        
        path = Path(file_path)
        if not path.exists():
            logger.error(f"Audio file not found: {file_path}")
            return ""
        
        try:
            async with httpx.AsyncClient() as client:
                with open(path, "rb") as f:
                    files = {
                        "file": (path.name, f),
                        "model": (None, "whisper-large-v3"),
                    }
                    headers = {
                        "Authorization": f"Bearer {self.api_key}",
                    }
                    
                    response = await client.post(
                        self.api_url,
                        headers=headers,
                        files=files,
                        timeout=60.0
                    )
                    
                    response.raise_for_status()
                    data = response.json()
                    return data.get("text", "")
                    
        except Exception as e:
            logger.error(f"Groq transcription error: {e}")
            return ""

WhatsApp 渠道实现

文件位置: /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py:15-141

架构特点

  1. Node.js 桥接: 通过 WebSocket 连接到 Node.js 桥接服务
  2. WhatsApp Web 协议: 使用 @whiskeysockets/baileys 库
  3. 自动重连: 连接断开后自动重新连接
  4. QR 码认证: 通过桥接终端扫描 QR 码

连接管理

代码位置: whatsapp.py:31-64

async def start(self) -> None:
    """Start the WhatsApp channel by connecting to the bridge."""
    import websockets
    
    bridge_url = self.config.bridge_url
    
    logger.info(f"Connecting to WhatsApp bridge at {bridge_url}...")
    
    self._running = True
    
    while self._running:
        try:
            async with websockets.connect(bridge_url) as ws:
                self._ws = ws
                self._connected = True
                logger.info("Connected to WhatsApp bridge")
                
                # Listen for messages
                async for message in ws:
                    try:
                        await self._handle_bridge_message(message)
                    except Exception as e:
                        logger.error(f"Error handling bridge message: {e}")
                
        except asyncio.CancelledError:
            break
        except Exception as e:
            self._connected = False
            self._ws = None
            logger.warning(f"WhatsApp bridge connection error: {e}")
            
            if self._running:
                logger.info("Reconnecting in 5 seconds...")
                await asyncio.sleep(5)

桥接消息处理

代码位置: whatsapp.py:91-141

async def _handle_bridge_message(self, raw: str) -> None:
    """Handle a message from the bridge."""
    try:
        data = json.loads(raw)
    except json.JSONDecodeError:
        logger.warning(f"Invalid JSON from bridge: {raw[:100]}")
        return
    
    msg_type = data.get("type")
    
    if msg_type == "message":
        # Incoming message from WhatsApp
        sender = data.get("sender", "")
        content = data.get("content", "")
        
        # sender is typically: <phone>@s.whatsapp.net
        # Extract just the phone number as chat_id
        chat_id = sender.split("@")[0] if "@" in sender else sender
        
        # Handle voice transcription if it's a voice message
        if content == "[Voice Message]":
            logger.info(f"Voice message received from {chat_id}, but direct download from bridge is not yet supported.")
            content = "[Voice Message: Transcription not available for WhatsApp yet]"
        
        await self._handle_message(
            sender_id=chat_id,
            chat_id=sender,  # Use full JID for replies
            content=content,
            metadata={
                "message_id": data.get("id"),
                "timestamp": data.get("timestamp"),
                "is_group": data.get("isGroup", False)
            }
        )
    
    elif msg_type == "status":
        # Connection status update
        status = data.get("status")
        logger.info(f"WhatsApp status: {status}")
        
        if status == "connected":
            self._connected = True
        elif status == "disconnected":
            self._connected = False
    
    elif msg_type == "qr":
        # QR code for authentication
        logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
    
    elif msg_type == "error":
        logger.error(f"WhatsApp bridge error: {data.get('error')}")

消息路由机制

系统消息路由

文件位置: /home/sujie/dev/github/nanobot/nanobot/agent/loop.py:218-308

路由设计

系统消息 (channel="system") 用于子代理结果回传等内部通信:

async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """
    Process a system message (e.g., subagent announce).
    
    The chat_id field contains "original_channel:original_chat_id" to route
    the response back to the correct destination.
    """
    logger.info(f"Processing system message from {msg.sender_id}")
    
    # Parse origin from chat_id (format: "channel:chat_id")
    if ":" in msg.chat_id:
        parts = msg.chat_id.split(":", 1)
        origin_channel = parts[0]
        origin_chat_id = parts[1]
    else:
        # Fallback
        origin_channel = "cli"
        origin_chat_id = msg.chat_id
    
    # Use the origin session for context
    session_key = f"{origin_channel}:{origin_chat_id}"
    session = self.sessions.get_or_create(session_key)
    
    # ... 处理逻辑 ...
    
    return OutboundMessage(
        channel=origin_channel,
        chat_id=origin_chat_id,
        content=final_content
    )

完整路由流程图

子代理结果回传机制

场景: 主代理调用 spawn 工具创建子代理,子代理处理完成后需要将结果回传给原始渠道。

流程:

  1. 子代理将结果作为系统消息发送到消息总线
  2. chat_id 设置为 "original_channel:original_chat_id"
  3. AgentLoop 识别为系统消息
  4. 解析原始渠道和会话 ID
  5. 将响应路由回原始渠道

CLI Gateway 模式

文件位置: /home/sujie/dev/github/nanobot/nanobot/cli/commands.py:155-267

启动流程

@app.command()
def gateway(
    port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
):
    """Start the nanobot gateway."""
    # ... 配置加载 ...
    
    # Create components
    bus = MessageBus()
    
    # Create provider
    provider = LiteLLMProvider(
        api_key=api_key,
        api_base=api_base,
        default_model=config.agents.defaults.model
    )
    
    # Create agent
    agent = AgentLoop(
        bus=bus,
        provider=provider,
        workspace=config.workspace_path,
        model=config.agents.defaults.model,
        max_iterations=config.agents.defaults.max_tool_iterations,
        brave_api_key=config.tools.web.search.api_key or None
    )
    
    # Create cron service
    # Create heartbeat service
    
    # Create channel manager
    channels = ChannelManager(config, bus)
    
    async def run():
        try:
            await cron.start()
            await heartbeat.start()
            await asyncio.gather(
                agent.run(),
                channels.start_all(),
            )
        except KeyboardInterrupt:
            console.print("\nShutting down...")
            heartbeat.stop()
            cron.stop()
            agent.stop()
            await channels.stop_all()
    
    asyncio.run(run())

Gateway 组件关系图


深挖价值点

1. 异步队列性能优化

位置: /home/sujie/dev/github/nanobot/nanobot/bus/queue.py:59

msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)

价值:

  • 使用 1 秒超时避免永久阻塞
  • 允许定期检查 _running 状态
  • 优雅的停止机制

深挖方向:

  • 背压控制: 当队列过大时如何降级处理
  • 优先级队列: 为紧急消息提供更高优先级
  • 批量处理: 合并多个消息以提高吞吐量

2. Markdown 转 HTML 的保护机制

位置: /home/sujie/dev/github/nanobot/nanobot/channels/telegram.py:23-37

# Extract and protect code blocks
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
    code_blocks.append(m.group(1))
    return f"\x00CB{len(code_blocks) - 1}\x00"

text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)

价值:

  • 使用空字符 (\x00) 保护代码块内容
  • 防止其他格式转换破坏代码
  • 最后统一恢复并转义 HTML

深挖方向:

  • 支持更多 Markdown 语法(表格、任务列表等)
  • 自定义渲染器的可插拔设计
  • 性能优化(预编译正则表达式)

3. 系统消息路由的巧妙设计

位置: /home/sujie/dev/github/nanobot/nanobot/agent/loop.py:228-231

if ":" in msg.chat_id:
    parts = msg.chat_id.split(":", 1)
    origin_channel = parts[0]
    origin_chat_id = parts[1]

价值:

  • 复用 chat_id 字段存储路由信息
  • 无需新增字段保持消息结构简单
  • 支持跨渠道的子代理通信

深挖方向:

  • 更复杂的路由规则(条件路由、广播)
  • 消息追踪和调试功能
  • 路由失败的回退机制

4. WhatsApp 桥接架构

位置: /home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py:31-64

价值:

  • Python 与 Node.js 的混合架构
  • 利用成熟的 Node.js WhatsApp 库
  • WebSocket 实时双向通信

深挖方向:

  • 桥接服务的独立部署和管理
  • 多 WhatsApp 账号支持
  • 消息持久化和重放机制

5. 权限检查的灵活设计

位置: /home/sujie/dev/github/nanobot/nanobot/channels/base.py:76-82

if "|" in sender_str:
    for part in sender_str.split("|"):
        if part and part in allow_list:
            return True

价值:

  • 支持多种 ID 格式
  • 兼容用户名和数字 ID
  • 优雅降级(无白名单则全部允许)

深挖方向:

  • 更细粒度的权限控制(按功能、按时间)
  • 动态权限更新(无需重启)
  • 权限审计和日志

关键代码位置索引

消息总线核心

组件文件路径行号
InboundMessage/home/sujie/dev/github/nanobot/nanobot/bus/events.py8-23
OutboundMessage/home/sujie/dev/github/nanobot/nanobot/bus/events.py26-35
MessageBus.__init__/home/sujie/dev/github/nanobot/nanobot/bus/queue.py19-23
MessageBus.publish_inbound/home/sujie/dev/github/nanobot/nanobot/bus/queue.py25-27
MessageBus.consume_inbound/home/sujie/dev/github/nanobot/nanobot/bus/queue.py29-31
MessageBus.publish_outbound/home/sujie/dev/github/nanobot/nanobot/bus/queue.py33-35
MessageBus.subscribe_outbound/home/sujie/dev/github/nanobot/nanobot/bus/queue.py41-49
MessageBus.dispatch_outbound/home/sujie/dev/github/nanobot/nanobot/bus/queue.py51-67

渠道管理器

组件文件路径行号
ChannelManager.__init__/home/sujie/dev/github/nanobot/nanobot/channels/manager.py24-30
ChannelManager._init_channels/home/sujie/dev/github/nanobot/nanobot/channels/manager.py32-57
ChannelManager.start_all/home/sujie/dev/github/nanobot/nanobot/channels/manager.py59-75
ChannelManager._dispatch_outbound/home/sujie/dev/github/nanobot/nanobot/channels/manager.py97-120
ChannelManager.stop_all/home/sujie/dev/github/nanobot/nanobot/channels/manager.py77-95

渠道基类

组件文件路径行号
BaseChannel 类/home/sujie/dev/github/nanobot/nanobot/channels/base.py10-121
BaseChannel.start/home/sujie/dev/github/nanobot/nanobot/channels/base.py32-42
BaseChannel.stop/home/sujie/dev/github/nanobot/nanobot/channels/base.py44-47
BaseChannel.send/home/sujie/dev/github/nanobot/nanobot/channels/base.py49-57
BaseChannel.is_allowed/home/sujie/dev/github/nanobot/nanobot/channels/base.py59-82
BaseChannel._handle_message/home/sujie/dev/github/nanobot/nanobot/channels/base.py84-116

Telegram 渠道

组件文件路径行号
TelegramChannel 类/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py79-302
_markdown_to_telegram_html/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py16-76
TelegramChannel.start/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py95-141
TelegramChannel.send/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py154-181
TelegramChannel._on_message/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py194-289
媒体下载逻辑/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py238-270
语音转录集成/home/sujie/dev/github/nanobot/nanobot/channels/telegram.py254-263

WhatsApp 渠道

组件文件路径行号
WhatsAppChannel 类/home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py15-141
WhatsAppChannel.start/home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py31-64
WhatsAppChannel.send/home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py75-89
WhatsAppChannel._handle_bridge_message/home/sujie/dev/github/nanobot/nanobot/channels/whatsapp.py91-141

Agent 循环

组件文件路径行号
AgentLoop.run/home/sujie/dev/github/nanobot/nanobot/agent/loop.py89-120
AgentLoop._process_message/home/sujie/dev/github/nanobot/nanobot/agent/loop.py123-216
AgentLoop._process_system_message/home/sujie/dev/github/nanobot/nanobot/agent/loop.py218-308
系统消息路由解析/home/sujie/dev/github/nanobot/nanobot/agent/loop.py228-231

CLI Gateway

组件文件路径行号
gateway 命令/home/sujie/dev/github/nanobot/nanobot/cli/commands.py155-267
组件初始化/home/sujie/dev/github/nanobot/nanobot/cli/commands.py176-239
运行循环/home/sujie/dev/github/nanobot/nanobot/cli/commands.py252-266

总结

nanobot 的消息总线与渠道架构展现了以下设计精髓:

  1. 极简主义: 核心代码仅 500 行左右,功能完整
  2. 异步优先: 全面采用 async/await,高效利用系统资源
  3. 接口统一: BaseChannel 抽象使得新渠道集成非常简单
  4. 松耦合: 消息总线作为中间层解耦渠道与代理
  5. 可扩展: 支持多渠道并行运行和系统消息路由

这种架构设计使得 nanobot 能够作为轻量级个人 AI 助手框架,同时保持足够的扩展性来支持复杂的聊天应用场景。