Nanobot 子代理架构深度分析
概述
nanobot 的子代理架构是一个轻量级异步任务执行系统,设计目标包括:
- 任务隔离:子代理拥有独立的上下文和系统提示,不会污染主代理的对话历史
- 后台执行:通过
asyncio.create_task()实现真正的异步并行执行 - 资源控制:限制迭代次数(15次)和工具集,防止无限循环和资源耗尽
- 结果归档:通过事件总线机制将结果异步通知给主代理
- 状态追踪:通过
_running_tasks字典追踪所有运行中的子代理
核心设计哲学是:子代理是主代理的"手",而非"脑"——它们专注于执行具体任务,不负责决策或通信。
子代理管理器
类图
核心职责
| 职责 | 方法 | 说明 |
|---|---|---|
| 子代理创建 | spawn() (行44-81) | 生成任务ID、创建异步任务、注册到追踪字典 |
| 子代理执行 | _run_subagent() (行83-166) | 构建工具集、执行LLM循环、处理结果 |
| 结果公告 | _announce_result() (行168-198) | 通过消息总线发送系统消息 |
| 状态查询 | get_running_count() (行231-233) | 返回运行中的子代理数量 |
子代理生命周期
运行中任务追踪
数据结构:_running_tasks: dict[str, asyncio.Task[None]] (行42)
- 键:8字符UUID前缀,如
"a3b5c7d1" - 值:异步任务对象
清理机制 (行78):
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))
当任务完成时,自动从字典中移除,无需手动清理。
子代理工具集 vs 主代理工具集
| 工具类型 | 主代理 | 子代理 | 说明 |
|---|---|---|---|
| ReadFile | ✓ | ✓ | 读取文件 |
| WriteFile | ✓ | ✓ | 写入文件 |
| EditFile | ✓ | ✗ | 编辑文件(主代理独有) |
| ListDir | ✓ | ✓ | 列出目录 |
| Exec | ✓ | ✓ | 执行Shell命令 |
| WebSearch | ✓ | ✓ | 网页搜索 |
| WebFetch | ✓ | ✓ | 网页获取 |
| Message | ✓ | ✗ | 通信工具 - 防止子代理直接与用户交互 |
| Spawn | ✓ | ✗ | 子代理工具 - 防止递归创建子代理 |
设计意图:
- 禁止递归:子代理不能创建子代理,避免无限层级
- 禁止通信:子代理不能直接向用户发送消息,所有通信必须通过主代理
- 工具精简:移除 EditFile,因为子代理专注于执行而非编辑
子代理执行流程
完整序列图
_run_subagent 方法分析
位置:subagent.py:83-166
async def _run_subagent(
self,
task_id: str,
task: str,
label: str,
origin: dict[str, str],
) -> None:
步骤1:构建子代理工具集 (行94-101)
tools = ToolRegistry()
tools.register(ReadFileTool())
tools.register(WriteFileTool())
tools.register(ListDirTool())
tools.register(ExecTool(working_dir=str(self.workspace)))
tools.register(WebSearchTool(api_key=self.brave_api_key))
tools.register(WebFetchTool())
特点:
- 每次创建独立的工具注册表
- 不包含 MessageTool 和 SpawnTool
- 共享 workspace 和 LLM provider
步骤2:构建消息 (行103-108)
system_prompt = self._build_subagent_prompt(task)
messages: list[dict[str, Any]] = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task},
]
步骤3:执行LLM循环 (行110-155)
max_iterations = 15
iteration = 0
final_result: str | None = None
while iteration < max_iterations:
iteration += 1
response = await self.provider.chat(...)
if response.has_tool_calls:
# 处理工具调用
...
else:
final_result = response.content
break
关键约束:
- 最大15次迭代
- 只有在没有工具调用时才退出循环
- 最终结果通过
final_result保存
步骤4:错误处理 (行163-166)
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(f"Subagent [{task_id}] failed: {e}")
await self._announce_result(task_id, label, task, error_msg, origin, "error")
子代理专用系统提示
位置:subagent.py:200-229
def _build_subagent_prompt(self, task: str) -> str:
return f"""# Subagent
You are a subagent spawned by the main agent to complete a specific task.
## Your Task
{task}
## Rules
1. Stay focused - complete only the assigned task, nothing else
2. Your final response will be reported back to the main agent
3. Do not initiate conversations or take on side tasks
4. Be concise but informative in your findings
## What You Can Do
- Read and write files in the workspace
- Execute shell commands
- Search the web and fetch web pages
- Complete the task thoroughly
## What You Cannot Do
- Send messages directly to users (no message tool available)
- Spawn other subagents
- Access the main agent's conversation history
## Workspace
Your workspace is at: {self.workspace}
When you have completed the task, provide a clear summary of your findings or actions."""
设计要点:
- 明确身份:"You are a subagent" - 强调从属关系
- 聚焦任务:嵌入任务描述到系统提示中
- 能力边界:明确列出能做和不能做的事
- 输出预期:要求"clear summary"而非自然对话
结果公告机制
位置:subagent.py:168-198
async def _announce_result(
self,
task_id: str,
label: str,
task: str,
result: str,
origin: dict[str, str],
status: str,
) -> None:
status_text = "completed successfully" if status == "ok" else "failed"
announce_content = f"""[Subagent '{label}' {status_text}]
Task: {task}
Result:
{result}
Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs."""
msg = InboundMessage(
channel="system",
sender_id="subagent",
chat_id=f"{origin['channel']}:{origin['chat_id']}",
content=announce_content,
)
await self.bus.publish_inbound(msg)
关键设计:
- 伪装成用户消息:
channel="system"但要求LLM"Summarize this naturally" - 隐藏技术细节:提示LLM不要提及"subagent"或"task IDs"
- 原始渠道保存:
chat_id保存了"channel:chat_id"格式,用于路由回原始对话
通信机制
主代理与子代理的消息交换
系统消息注入机制
子代理侧(subagent.py:190-195):
msg = InboundMessage(
channel="system", # 特殊通道标识
sender_id="subagent", # 发送者标识
chat_id=f"{origin['channel']}:{origin['chat_id']}", # 路由信息
content=announce_content,
)
主代理侧(loop.py:133-136):
# Handle system messages (subagent announces)
if msg.channel == "system":
return await self._process_system_message(msg)
路由解析(loop.py:227-235):
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
原始渠道信息传递
完整流程:
- SpawnTool 捕获上下文(
spawn.py:24-27):
def set_context(self, channel: str, chat_id: str) -> None:
self._origin_channel = channel
self._origin_chat_id = chat_id
- 传递给 SubagentManager(
spawn.py:60-64):
return await self._manager.spawn(
task=task,
label=label,
origin_channel=self._origin_channel,
origin_chat_id=self._origin_chat_id,
)
- SubagentManager 保存 origin(
subagent.py:66-69):
origin = {
"channel": origin_channel,
"chat_id": origin_chat_id,
}
- Announce 时路由回(
subagent.py:193):
chat_id=f"{origin['channel']}:{origin['chat_id']}",
- Main Agent 恢复上下文(
loop.py:304-307):
return OutboundMessage(
channel=origin_channel,
chat_id=origin_chat_id,
content=final_content
)
结果摘要的生成策略
两阶段摘要:
- 子代理输出:详细的技术结果
- 主代理再摘要:转换为用户友好的自然语言
提示策略(subagent.py:180-187):
Summarize this naturally for the user. Keep it brief (1-2 sentences).
Do not mention technical details like "subagent" or task IDs.
主代理处理(loop.py:250-254):
messages = self.context.build_messages(
history=session.get_history(), # 原始对话历史
current_message=msg.content # 公告内容
)
主代理在原始对话上下文中处理公告,自然地生成用户友好的响应。
设计决策分析
1. 为什么子代理没有 message 和 spawn 工具?
| 工具 | 主代理 | 子代理 | 原因 |
|---|---|---|---|
| MessageTool | ✓ | ✗ | 防止直接通信:子代理结果必须通过主代理审核和格式化后才能发给用户,避免技术细节泄露或格式不一致 |
| SpawnTool | ✓ | ✗ | 防止递归创建:避免无限子代理层级导致资源耗尽和复杂性失控 |
代码证据(subagent.py:94-101):
tools = ToolRegistry()
tools.register(ReadFileTool())
tools.register(WriteFileTool())
tools.register(ListDirTool())
tools.register(ExecTool(...))
tools.register(WebSearchTool(...))
tools.register(WebFetchTool())
# 注意:没有 tools.register(MessageTool())
# 注意:没有 tools.register(SpawnTool())
对比(loop.py:66-87):
def _register_default_tools(self) -> None:
# ... 文件和工具 ...
tools.register(MessageTool(send_callback=...))
tools.register(SpawnTool(manager=self.subagents))
2. 子代理的隔离性设计
上下文隔离
| 方面 | 主代理 | 子代理 |
|---|---|---|
| 会话历史 | ✓(SessionManager) | ✗(无历史访问) |
| 系统提示 | 通用助手提示 | 任务专用提示 |
| 工具集 | 完整(9个工具) | 受限(6个工具) |
| 对话能力 | 自然对话 | 单向报告 |
提示差异(subagent.py:221-224):
## What You Cannot Do
- Send messages directly to users (no message tool available)
- Spawn other subagents
- Access the main agent's conversation history
工具集隔离
每个子代理创建独立的 ToolRegistry 实例:
tools = ToolRegistry() # 独立实例
tools.register(ReadFileTool())
# ...
对比主代理:
self.tools = ToolRegistry() # 共享实例
self._register_default_tools() # 注册所有工具
消息隔离
子代理的输出直接丢弃,只通过 _announce_result 发送结构化摘要:
# 子代理的 messages 数组只在子代理内部使用
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task},
# ... 迭代添加 ...
]
# 从不传递给主代理
3. 异步任务创建和清理机制
任务创建(subagent.py:72-74)
bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin)
)
特点:
- 立即返回,不阻塞主代理
- 任务在事件循环中独立执行
- 返回
Task对象用于追踪
任务追踪(subagent.py:75-78)
self._running_tasks[task_id] = bg_task
bg_task.add_done_callback(
lambda _: self._running_tasks.pop(task_id, None)
)
机制:
- 注册:任务ID → 任务对象
- 自动清理:任务完成时自动从字典移除
- 内存安全:
pop(task_id, None)防止KeyError
并发示例
# 主代理可以同时启动多个子代理
await spawn(task="分析日志文件", label="日志分析")
await spawn(task="下载数据集", label="数据获取")
await spawn(task="运行测试", label="测试执行")
# 所有子代理并行运行
assert subagent_manager.get_running_count() == 3
4. 错误处理和失败通知
三层错误处理
| 层级 | 位置 | 处理方式 |
|---|---|---|
| 工具执行 | ToolRegistry.execute() | 捕获异常,返回错误字符串 |
| 子代理循环 | _run_subagent() | 捕获异常,调用 _announce_result(status="error") |
| 主代理 | _process_message() | 捕获异常,发送错误响应 |
工具层(registry.py:56-59):
try:
return await tool.execute(**params)
except Exception as e:
return f"Error executing {name}: {str(e)}"
子代理层(subagent.py:163-166):
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(f"Subagent [{task_id}] failed: {e}")
await self._announce_result(task_id, label, task, error_msg, origin, "error")
失败通知流程
公告格式(subagent.py:178-186):
[Subagent 'xxx' failed]
Task: xxx
Result:
Error: xxx
Summarize this naturally for the user. Keep it brief (1-2 sentences).
使用场景分析
适合使用子代理的场景
| 场景类型 | 示例任务 | 为什么适合 |
|---|---|---|
| 长时间运行任务 | 下载大文件、批量处理数据 | 不阻塞主代理对话 |
| 需要多次迭代 | 代码审查、日志分析 | 子代理有15次迭代预算 |
| 独立查询 | API调用、数据抓取 | 结果不影响主代理上下文 |
| 后台清理 | 日志归档、临时文件清理 | 用户无需等待 |
示例1:代码审查
用户: "帮我审查 src/ 目录下的所有Python代码"
主代理: spawn(task="审查 src/ 目录的Python代码,检查错误、安全和性能问题")
子代理: [后台执行] 读取所有文件 → 分析 → 生成报告
主代理: [收到公告] "我已经完成了代码审查,发现3个潜在的安全问题..."
示例2:数据下载
用户: "下载 Kaggle 的 MNIST 数据集"
主代理: spawn(task="下载 MNIST 数据集到 workspace/data/")
子代理: [后台执行] 下载 → 解压 → 验证完整性
主代理: [收到公告] "MNIST 数据集已下载并解压,共 60,000 个训练样本。"
子代理 vs 直接工具调用
| 维度 | 直接工具调用 | 子代理 |
|---|---|---|
| 执行方式 | 同步,阻塞 | 异步,后台 |
| 迭代能力 | 单次调用 | 最多15次LLM循环 |
| 上下文 | 共享主代理会话 | 独立上下文 |
| 适用性 | 简单、快速任务 | 复杂、多步骤任务 |
| 用户感知 | 立即响应 | 延迟通知 |
决策树:
并发子代理的处理
上下文保存
每个子代理运行时,主代理的 message_tool 和 spawn_tool 上下文保持不变:
# 主代理处理消息时设置上下文
message_tool.set_context(msg.channel, msg.chat_id)
spawn_tool.set_context(msg.channel, msg.chat_id)
# spawn 后立即返回,上下文不受子代理影响
result = await self.tools.execute("spawn", {...})
子公告路由
多个子代理的结果通过 chat_id 正确路由回原始会话:
# 子代理1(从 telegram:123 派生)
chat_id="telegram:123" → 路由到 telegram:123
# 子代理2(从 discord:456 派生)
chat_id="discord:456" → 路由到 discord:456
并发限制
当前实现没有限制并发子代理数量。潜在风险:
- LLM API 请求激增
- 磁盘I/O 竞争
- 内存占用增加
改进建议(可选):
async def spawn(self, ...) -> str:
if self.get_running_count() >= MAX_CONCURRENT_SUBAGENTS:
return "Too many subagents running. Please wait."
# ... 原有逻辑 ...
关键代码位置索引
核心类和方法
| 类/方法 | 文件 | 行号 | 说明 |
|---|---|---|---|
| SubagentManager | subagent.py | 20-234 | 子代理管理器 |
spawn() | subagent.py | 44-81 | 创建子代理 |
_run_subagent() | subagent.py | 83-166 | 执行子代理任务 |
_announce_result() | subagent.py | 168-198 | 公告结果 |
_build_subagent_prompt() | subagent.py | 200-229 | 构建系统提示 |
| AgentLoop | loop.py | 24-330 | 主代理循环 |
_process_system_message() | loop.py | 218-308 | 处理子代理公告 |
| SpawnTool | spawn.py | 11-66 | Spawn工具 |
execute() | spawn.py | 58-65 | 执行spawn调用 |
set_context() | spawn.py | 24-27 | 设置上下文 |
| ToolRegistry | registry.py | 8-71 | 工具注册表 |
execute() | registry.py | 38-59 | 执行工具 |
重要数据结构
| 数据结构 | 位置 | 类型 | 说明 |
|---|---|---|---|
_running_tasks | subagent.py:42 | dict[str, asyncio.Task] | 运行中任务追踪 |
origin | subagent.py:66-69 | dict[str, str] | 原始渠道信息 |
max_iterations | subagent.py:111 | int | 子代理最大迭代次数 |
InboundMessage | events.py:9-24 | dataclass | 入站消息 |
OutboundMessage | events.py:27-36 | dataclass | 出站消息 |
关键流程入口
| 流程 | 入口文件 | 入口方法 | 调用链 |
|---|---|---|---|
| 创建子代理 | loop.py | _process_message() | spawn_tool.execute() → subagent_manager.spawn() |
| 子代理执行 | subagent.py | _run_subagent() | provider.chat() → tools.execute() |
| 结果公告 | subagent.py | _announce_result() | bus.publish_inbound() |
| 公告处理 | loop.py | _process_system_message() | provider.chat() → 生成自然语言响应 |
深挖价值点
1. 子代理状态监控与恢复机制
当前状态:
- 只有
_running_tasks追踪运行中的任务 - 任务完成后自动清理
- 无持久化,重启后丢失
深挖价值:
- 添加任务状态数据库(SQLite)
- 支持任务取消和暂停
- 重启后恢复未完成的子代理
- 提供任务历史查询和审计日志
2. 子代理间协作模式
当前状态:
- 子代理完全隔离,互不通信
- 无法共享中间结果
深挖价值:
- 实现"管道模式":子代理A的输出 → 子代理B的输入
- 共享内存/文件区域用于协作
- 添加任务依赖图和执行编排
- 实现 MapReduce 模式的子代理批处理
3. 自适应迭代次数控制
当前状态:
- 固定
max_iterations = 15 - 所有任务使用相同限制
深挖价值:
- 根据任务复杂度动态调整
- 基于历史数据预测所需迭代次数
- 实现提前终止机制(检测到收敛)
- 支持无限迭代模式(带人工确认)
4. 子代理资源配额系统
当前状态:
- 无资源限制
- 可能导致API滥用
深挖价值:
- 设置并发子代理数量上限
- 实现令牌桶限流算法
- 监控每个子代理的API调用量
- 添加优先级队列和资源抢占
5. 子代理调试和可视化
当前状态:
- 只有日志输出
- 无实时状态查看
深挖价值:
- 添加子代理状态API(REST/WebSocket)
- 实时进度追踪可视化
- 消息历史回放和调试
- 性能分析和瓶颈识别
6. 子代理沙箱安全机制
当前状态:
- 工具集受限
- 共享文件系统和网络
深挖价值:
- 实现文件系统隔离(chroot/容器)
- 添加网络访问白名单
- 实现代码执行沙箱(Docker/VM)
- 添加资源限制(CPU、内存、磁盘)
附录
A. 完整工具对比表
| 工具 | 主代理 | 子代理 | 说明 |
|---|---|---|---|
| ReadFile | ✓ | ✓ | 读取文件内容 |
| WriteFile | ✓ | ✓ | 写入文件(覆盖) |
| EditFile | ✓ | ✗ | 编辑文件(主代理独有) |
| ListDir | ✓ | ✓ | 列出目录内容 |
| Exec | ✓ | ✓ | 执行Shell命令 |
| WebSearch | ✓ | ✓ | 网页搜索 |
| WebFetch | ✓ | ✓ | 获取网页内容 |
| Message | ✓ | ✗ | 发送消息给用户 |
| Spawn | ✓ | ✗ | 创建子代理 |
B. 消息格式示例
Spawn请求(工具调用)
{
"name": "spawn",
"arguments": {
"task": "分析 src/ 目录下的所有Python代码",
"label": "代码审查"
}
}
Spawn响应(工具返回)
Subagent [代码审查] started (id: a3b5c7d1). I'll notify you when it completes.
子代理公告(系统消息)
[Subagent '代码审查' completed successfully]
Task: 分析 src/ 目录下的所有Python代码
Result:
已分析 12 个 Python 文件,发现以下问题:
1. utils.py:45 - 未使用的变量 import
2. api.py:123 - SQL注入风险
3. main.py:67 - 硬编码密钥
Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs.
用户最终响应
我已经完成了代码审查,在 12 个文件中发现了 3 个需要修复的问题,包括未使用的导入、SQL注入风险和硬编码密钥。
C. 错误处理流程示例
错误场景:子代理工具执行失败
1. 子代理调用工具
tools.execute("exec", {"command": "invalid_command"})
2. 工具层捕获异常(registry.py:58)
return f"Error executing exec: command not found"
3. 子代理继续尝试(根据LLM决策)
# LLM看到错误消息,可能重试或报告
4. 子代理最终报告结果
await self._announce_result(..., status="ok")
5. 主代理生成摘要
"由于命令不存在,无法完成任务。"
错误场景:子代理抛出未捕获异常
1. 子代理执行中发生异常
raise ValueError("Invalid configuration")
2. 子代理层捕获(subagent.py:163)
except Exception as e:
await self._announce_result(..., error_msg=str(e), status="error")
3. 主代理收到错误公告
[Subagent 'xxx' failed]
Result: Error: Invalid configuration
4. 主代理生成错误响应
"任务执行失败:无效的配置。"
文档版本: 1.0 分析日期: 2026-02-04 nanobot 版本: latest (based on core files) 作者: Deep Project Analysis Expert