nanobot 扩展机制深度分析
概述
nanobot 是一个超轻量级个人 AI 助手框架(约 4,000 行核心代码),其扩展机制设计遵循以下核心原则:
- 渐进式加载:始终加载的技能(always skills)与按需加载的技能分离,最小化上下文开销
- 文件即技能:技能以 Markdown 文件形式存在,便于编辑和版本控制
- 插件化架构:渠道、工具、LLM 提供商均可通过继承抽象类扩展
- 配置驱动:通过 Pydantic Settings 支持环境变量和配置文件
技能系统
架构概览
技能系统是 nanobot 扩展能力的核心,采用基于文件的声明式设计:
核心类分析
SkillsLoader (nanobot/agent/skills.py)
class SkillsLoader:
def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
self.workspace = workspace
self.workspace_skills = workspace / "skills"
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
关键方法:
| 方法 | 位置 | 功能 |
|---|---|---|
list_skills() | 26 行 | 列出所有可用技能,支持过滤不可用技能 |
load_skill() | 59 行 | 按名称加载单个技能内容 |
load_skills_for_context() | 82 行 | 加载多个技能到上下文 |
build_skills_summary() | 101 行 | 构建技能摘要(XML 格式) |
get_always_skills() | 193 行 | 获取标记为 always=true 的技能 |
get_skill_metadata() | 203 行 | 解析技能的 YAML frontmatter |
技能目录结构
nanobot/skills/
├── github/
│ └── SKILL.md
├── tmux/
│ ├── SKILL.md
│ └── scripts/
│ ├── wait-for-text.sh
│ └── find-sessions.sh
├── summarize/
│ └── SKILL.md
├── weather/
│ └── SKILL.md
└── skill-creator/
└── SKILL.md
技能文件格式
每个技能目录包含 SKILL.md 文件,格式如下:
---
name: github
description: "Interact with GitHub using the `gh` CLI."
metadata: {"nanobot":{"emoji":"🐙","requires":{"bins":["gh"]},"install":[...]}}
---
# GitHub Skill
Use the `gh` CLI to interact with GitHub...
Frontmatter 字段:
| 字段 | 类型 | 说明 |
|---|---|---|
name | string | 技能名称 |
description | string | 技能描述 |
metadata | JSON | nanobot 元数据 |
always | boolean | 是否始终加载 |
requires | object | 依赖要求(bins, env) |
install | array | 安装说明 |
Always Skills vs Available Skills
区别:
- Always Skills:标记
always=true,完整内容直接注入系统提示 - Available Skills:只展示摘要,Agent 通过
read_file工具按需加载
技能摘要构建
build_skills_summary() 方法生成 XML 格式的技能摘要:
<skills>
<skill available="true">
<name>github</name>
<description>Interact with GitHub using the `gh` CLI.</description>
<location>/path/to/github/SKILL.md</location>
</skill>
<skill available="false">
<name>tmux</name>
<description>Remote-control tmux sessions.</description>
<location>/path/to/tmux/SKILL.md</location>
<requires>CLI: tmux</requires>
</skill>
</skills>
依赖检查机制
def _check_requirements(self, skill_meta: dict) -> bool:
requires = skill_meta.get("requires", {})
for b in requires.get("bins", []):
if not shutil.which(b):
return False
for env in requires.get("env", []):
if not os.environ.get(env):
return False
return True
支持的要求类型:
bins: 可执行文件(使用shutil.which()检查)env: 环境变量
技能动态加载流程
示例技能分析
GitHub 技能 (skills/github/SKILL.md)
---
name: github
description: "Interact with GitHub using the `gh` CLI. Use `gh issue`, `gh pr`, `gh run`, and `gh api` for issues, PRs, CI runs, and advanced queries."
metadata: {"nanobot":{"emoji":"🐙","requires":{"bins":["gh"]},"install":[...]}}
---
特点:
- 依赖
ghCLI - 提供 PR、Issue、CI 查询示例
- 使用
--json和--jq支持结构化输出
tmux 技能 (skills/tmux/SKILL.md)
---
name: tmux
description: "Remote-control tmux sessions for interactive CLIs by sending keystrokes and scraping pane output."
metadata: {"nanobot":{"emoji":"🧵","os":["darwin","linux"],"requires":{"bins":["tmux"]}}}
---
特点:
- 使用独立 socket 隔离会话
- 提供
wait-for-text.sh辅助脚本 - 支持多会话并行任务调度
summarize 技能 (skills/summarize/SKILL.md)
---
name: summarize
description: "Summarize or extract text/transcripts from URLs, podcasts, and local files."
homepage: https://summarize.sh
metadata: {"nanobot":{"emoji":"🧾","requires":{"bins":["summarize"]},"install":[...]}}
---
特点:
- 支持 URL、文件、YouTube 链接
- 触发短语检测("what's this link about?")
- 多 API Key 支持
Cron 定时任务
架构设计
Cron 系统支持三种调度类型,并提供持久化存储和灵活的回调机制:
CronService 核心实现
初始化 (nanobot/cron/service.py:42-54)
class CronService:
def __init__(
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
):
self.store_path = store_path
self.on_job = on_job # 回调函数,返回响应文本
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
启动流程 (nanobot/cron/service.py:147-154)
调度类型
1. At 调度(一次性任务)
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
特点:
- 指定时间戳(毫秒)
- 执行后禁用或删除
- 适合延迟执行或定时提醒
2. Every 调度(固定间隔)
if schedule.kind == "every":
if not schedule.every_ms or schedule.every_ms <= 0:
return None
return now_ms + schedule.every_ms
特点:
- 固定间隔重复执行
- 从当前时间开始计算
- 适合轮询、健康检查
3. Cron 表达式调度
if schedule.kind == "cron" and schedule.expr:
try:
from croniter import croniter
cron = croniter(schedule.expr, time.time())
next_time = cron.get_next()
return int(next_time * 1000)
except Exception:
return None
特点:
- 标准 cron 表达式(
0 9 * * *) - 支持复杂调度规则
- 依赖
croniter库
CronJob 数据结构
@dataclass
class CronJob:
id: str # 任务 ID(UUID 前缀)
name: str # 任务名称
enabled: bool = True # 是否启用
schedule: CronSchedule # 调度配置
payload: CronPayload # 执行内容
state: CronJobState # 运行状态
created_at_ms: int = 0 # 创建时间
updated_at_ms: int = 0 # 更新时间
delete_after_run: bool = False # 执行后是否删除
下次运行时间计算
定时器机制
任务执行流程
async def _execute_job(self, job: CronJob) -> None:
start_ms = _now_ms()
logger.info(f"Cron: executing job '{job.name}' ({job.id})")
try:
response = None
if self.on_job:
response = await self.on_job(job)
job.state.last_status = "ok"
job.state.last_error = None
logger.info(f"Cron: job '{job.name}' completed")
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)
logger.error(f"Cron: job '{job.name}' failed: {e}")
job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms()
# 处理一次性任务
if job.schedule.kind == "at":
if job.delete_after_run:
self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
else:
job.enabled = False
job.state.next_run_at_ms = None
else:
# 计算下次运行
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
Cron 任务状态图
持久化机制
存储格式(JSON):
{
"version": 1,
"jobs": [
{
"id": "abc12345",
"name": "daily report",
"enabled": true,
"schedule": {
"kind": "cron",
"expr": "0 9 * * *",
"tz": null
},
"payload": {
"kind": "agent_turn",
"message": "Generate daily report",
"deliver": false,
"channel": null,
"to": null
},
"state": {
"nextRunAtMs": 1738699200000,
"lastRunAtMs": 1738612800000,
"lastStatus": "ok",
"lastError": null
},
"createdAtMs": 1738526400000,
"updatedAtMs": 1738612800000,
"deleteAfterRun": false
}
]
}
公共 API
| 方法 | 位置 | 功能 |
|---|---|---|
list_jobs(include_disabled) | 251 行 | 列出所有任务 |
add_job(name, schedule, message, ...) | 257 行 | 添加新任务 |
remove_job(job_id) | 296 行 | 删除任务 |
enable_job(job_id, enabled) | 310 行 | 启用/禁用任务 |
run_job(job_id, force) | 326 行 | 手动执行任务 |
status() | 339 行 | 获取服务状态 |
心跳服务
设计理念
Heartbeat 服务实现周期性唤醒机制,让 Agent 主动检查待处理任务:
HeartbeatService 实现
class HeartbeatService:
"""
Periodic heartbeat service that wakes the agent to check for tasks.
The agent reads HEARTBEAT.md from the workspace and executes any
tasks listed there. If nothing needs attention, it replies HEARTBEAT_OK.
"""
核心参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
interval_s | 1800(30 分钟) | 心跳间隔 |
enabled | True | 是否启用 |
on_heartbeat | None | 回调函数 |
HEARTBEAT.md 机制
文件内容示例
# Pending Tasks
- [ ] Review PR #123
- [ ] Update documentation
- [ ] Deploy to staging
空内容检测
def _is_heartbeat_empty(content: str | None) -> bool:
if not content:
return True
# Lines to skip: empty, headers, HTML comments, empty checkboxes
skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
for line in content.split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("<!--") or line in skip_patterns:
continue
return False # Found actionable content
return True
跳过的行:
- 空行
- 标题(
#开头) - HTML 注释(
<!--) - 空复选框(
- [ ]) - 已完成复选框(
- [x])
HEARTBEAT_OK_TOKEN 语义
HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists).
Follow any instructions or tasks listed there.
If nothing needs attention, reply with just: HEARTBEAT_OK"""
HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
语义:
- Agent 检查 HEARTBEAT.md 后,若无任务则返回
HEARTBEAT_OK - 忽略大小写和下划线(
HEARTBEAT_OK_TOKEN in response.upper().replace("_", "")) - 标识"无事可做"的状态
心跳循环流程
使用场景
- 定期提醒:每日检查清单
- 后台任务:定时数据同步
- 等待触发:等待外部条件满足后执行
- 批处理:累积任务后一次性处理
配置系统
Config 类层次结构
多 LLM 提供商配置优先级
def get_api_key(self) -> str | None:
"""Get API key in priority order: OpenRouter > Anthropic > OpenAI > Gemini > Zhipu > Groq > vLLM."""
return (
self.providers.openrouter.api_key or
self.providers.anthropic.api_key or
self.providers.openai.api_key or
self.providers.gemini.api_key or
self.providers.zhipu.api_key or
self.providers.groq.api_key or
self.providers.vllm.api_key or
None
)
优先级顺序:
- OpenRouter(
sk-or-前缀) - Anthropic
- OpenAI
- Gemini
- Zhipu
- Groq
- vLLM
环境变量支持
class Config(BaseSettings):
class Config:
env_prefix = "NANOBOT_"
env_nested_delimiter = "__"
使用示例:
| 配置项 | 环境变量 |
|---|---|
agents.defaults.workspace | NANOBOT_AGENTS__DEFAULTS__WORKSPACE |
providers.anthropic.api_key | NANOBOT_PROVIDERS__ANTHROPIC__API_KEY |
channels.telegram.token | NANOBOT_CHANNELS__TELEGRAM__TOKEN |
配置加载机制
扩展点分析
1. 添加新技能
步骤
- 创建技能目录
mkdir -p workspace/skills/my-skill
- 编写 SKILL.md
---
name: my-skill
description: "描述技能的功能"
metadata: {"nanobot":{"emoji":"🚀","requires":{"bins":["my-cli"]}}}
---
# My Skill
技能使用说明...
## 示例
```bash
my-cli --help
3. **可选:标记为始终加载**
```yaml
---
name: my-skill
always: true
...
---
- 可选:添加辅助脚本
mkdir -p workspace/skills/my-skill/scripts
touch workspace/skills/my-skill/scripts/helper.sh
代码位置参考
| 功能 | 文件 | 行号 |
|---|---|---|
| 技能发现 | nanobot/agent/skills.py | 26-57 |
| 技能加载 | nanobot/agent/skills.py | 59-80 |
| 依赖检查 | nanobot/agent/skills.py | 177-186 |
| 总是加载 | nanobot/agent/skills.py | 193-201 |
| 上下文注入 | nanobot/agent/context.py | 52-68 |
2. 添加新渠道
步骤
- 继承 BaseChannel
from nanobot.channels.base import BaseChannel
class DiscordChannel(BaseChannel):
name = "discord"
async def start(self) -> None:
# 连接到 Discord
pass
async def stop(self) -> None:
# 断开连接
pass
async def send(self, msg: OutboundMessage) -> None:
# 发送消息
pass
- 添加配置 Schema
# nanobot/config/schema.py
class DiscordConfig(BaseModel):
enabled: bool = False
token: str = ""
allow_from: list[str] = Field(default_factory=list)
class ChannelsConfig(BaseModel):
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig) # 新增
- 在 ChannelManager 中注册
# nanobot/channels/manager.py
def _init_channels(self) -> None:
# ... 现有渠道 ...
if self.config.channels.discord.enabled:
from nanobot.channels.discord import DiscordChannel
self.channels["discord"] = DiscordChannel(
self.config.channels.discord, self.bus
)
代码位置参考
| 功能 | 文件 | 行号 |
|---|---|---|
| BaseChannel 接口 | nanobot/channels/base.py | 10-122 |
| ChannelManager | nanobot/channels/manager.py | 14-140 |
| Telegram 实现 | nanobot/channels/telegram.py | 79-303 |
| WhatsApp 实现 | nanobot/channels/whatsapp.py | 15-142 |
3. 添加新工具
步骤
- 继承 Tool 基类
from nanobot.agent.tools.base import Tool
from typing import Any
class MyTool(Tool):
@property
def name(self) -> str:
return "my_tool"
@property
def description(self) -> str:
return "Description of what this tool does"
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "Parameter description"
}
},
"required": ["param1"]
}
async def execute(self, param1: str, **kwargs: Any) -> str:
# 执行工具逻辑
return f"Result: {param1}"
- 在 AgentLoop 中注册
# nanobot/agent/loop.py
def _register_default_tools(self) -> None:
# ... 现有工具 ...
self.tools.register(MyTool())
代码位置参考
| 功能 | 文件 | 行号 |
|---|---|---|
| Tool 基类 | nanobot/agent/tools/base.py | 7-56 |
| ToolRegistry | nanobot/agent/tools/registry.py | 8-71 |
| Shell 工具示例 | nanobot/agent/tools/shell.py | 10-86 |
| 文件系统工具 | nanobot/agent/tools/filesystem.py |
4. 添加新 LLM 提供商
步骤
- 继承 LLMProvider
from nanobot.providers.base import LLMProvider, LLMResponse
class MyProvider(LLMProvider):
def __init__(self, api_key: str | None = None, api_base: str | None = None):
super().__init__(api_key, api_base)
async def chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
) -> LLMResponse:
# 调用 API
response = await self._call_api(messages, model)
return LLMResponse(content=response["content"])
def get_default_model(self) -> str:
return "my-provider/default-model"
- 添加配置
# nanobot/config/schema.py
class ProvidersConfig(BaseModel):
# ... 现有提供商 ...
my_provider: ProviderConfig = Field(default_factory=ProviderConfig)
- 更新 get_api_key
def get_api_key(self) -> str | None:
return (
# ... 现有优先级 ...
self.providers.my_provider.api_key or
None
)
代码位置参考
| 功能 | 文件 | 行号 |
|---|---|---|
| LLMProvider 基类 | nanobot/providers/base.py | 30-70 |
| LiteLLM 实现 | nanobot/providers/litellm_provider.py | 12-174 |
| 配置 Schema | nanobot/config/schema.py | 81-120 |
关键代码位置索引
技能系统
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/agent/skills.py | SkillsLoader.__init__ | 21-24 |
nanobot/agent/skills.py | list_skills | 26-57 |
nanobot/agent/skills.py | load_skill | 59-80 |
nanobot/agent/skills.py | build_skills_summary | 101-140 |
nanobot/agent/skills.py | get_always_skills | 193-201 |
nanobot/agent/skills.py | _check_requirements | 177-186 |
nanobot/agent/context.py | build_system_prompt | 27-70 |
Cron 定时任务
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/cron/service.py | CronService.__init__ | 42-54 |
nanobot/cron/service.py | start | 147-154 |
nanobot/cron/service.py | _arm_timer | 180-197 |
nanobot/cron/service.py | _on_timer | 199-214 |
nanobot/cron/service.py | _execute_job | 216-248 |
nanobot/cron/service.py | add_job | 257-294 |
nanobot/cron/types.py | CronJob | 41-53 |
nanobot/cron/types.py | CronSchedule | 7-19 |
心跳服务
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/heartbeat/service.py | HeartbeatService.__init__ | 46-58 |
nanobot/heartbeat/service.py | start | 73-81 |
nanobot/heartbeat/service.py | _run_loop | 90-100 |
nanobot/heartbeat/service.py | _tick | 102-124 |
nanobot/heartbeat/service.py | _is_heartbeat_empty | 21-35 |
配置系统
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/config/schema.py | Config | 81-120 |
nanobot/config/schema.py | get_api_key | 94-105 |
nanobot/config/schema.py | get_api_base | 107-115 |
nanobot/config/schema.py | ProvidersConfig | 48-56 |
渠道系统
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/channels/base.py | BaseChannel | 10-122 |
nanobot/channels/manager.py | ChannelManager | 14-140 |
nanobot/channels/telegram.py | TelegramChannel | 79-303 |
nanobot/channels/whatsapp.py | WhatsAppChannel | 15-142 |
工具系统
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/agent/tools/base.py | Tool | 7-56 |
nanobot/agent/tools/registry.py | ToolRegistry | 8-71 |
nanobot/agent/tools/shell.py | ExecTool | 10-86 |
nanobot/agent/loop.py | _register_default_tools | 66-87 |
LLM 提供商
| 文件 | 关键类/方法 | 行号 |
|---|---|---|
nanobot/providers/base.py | LLMProvider | 30-70 |
nanobot/providers/litellm_provider.py | LiteLLMProvider | 12-174 |
nanobot/providers/litellm_provider.py | chat | 63-131 |
深挖价值点
1. LiteLLM 多提供商统一接口
为什么值得深挖:
LiteLLMProvider通过 LiteLLM 库实现了 100+ LLM 提供商的统一接口- 智能模型前缀处理(
openrouter/,zhipu/,gemini/,hosted_vllm/) - API 密钥自动检测和配置
关键代码 (nanobot/providers/litellm_provider.py):
# 自动检测 OpenRouter
self.is_openrouter = (
(api_key and api_key.startswith("sk-or-")) or
(api_base and "openrouter" in api_base)
)
# Zhipu 模型前缀处理
if ("glm" in model.lower() or "zhipu" in model.lower()) and not (
model.startswith("zhipu/") or
model.startswith("zai/") or
model.startswith("openrouter/")
):
model = f"zhipu/{model}"
2. 渐进式技能加载
为什么值得深挖:
- 始终加载的技能直接注入系统提示,保证核心能力
- 可用技能只展示摘要,Agent 通过
read_file按需加载 - 减少上下文 token 消耗,提高响应速度
关键代码 (nanobot/agent/context.py:52-68):
# 1. Always-loaded skills: include full content
always_skills = self.skills.get_always_skills()
if always_skills:
always_content = self.skills.load_skills_for_context(always_skills)
if always_content:
parts.append(f"# Active Skills\n\n{always_content}")
# 2. Available skills: only show summary
skills_summary = self.skills.build_skills_summary()
if skills_summary:
parts.append(f"""# Skills
The following skills extend your capabilities. To use a skill, read its SKILL.md file using the read_file tool.
{skills_summary}""")
3. Cron 定时器精度优化
为什么值得深挖:
- 使用
asyncio.sleep()实现事件循环驱动的定时器 - 动态计算下一个唤醒时间,避免固定轮询开销
- 支持毫秒级精度
关键代码 (nanobot/cron/service.py:180-197):
def _arm_timer(self) -> None:
if self._timer_task:
self._timer_task.cancel()
next_wake = self._get_next_wake_ms()
if not next_wake or not self._running:
return
delay_ms = max(0, next_wake - _now_ms())
delay_s = delay_ms / 1000
async def tick():
await asyncio.sleep(delay_s)
if self._running:
await self._on_timer()
self._timer_task = asyncio.create_task(tick())
4. Telegram Markdown 到 HTML 转换
为什么值得深挖:
- 使用占位符机制保护代码块和内联代码
- 正则表达式处理 Markdown 语法
- 支持 Telegram HTML 格式的所有特性
关键代码 (nanobot/channels/telegram.py:16-76):
def _markdown_to_telegram_html(text: str) -> str:
# 1. Extract and protect code blocks
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
code_blocks.append(m.group(1))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
# 2. Extract and protect inline code
inline_codes: list[str] = []
def save_inline_code(m: re.Match) -> str:
inline_codes.append(m.group(1))
return f"\x00IC{len(inline_codes) - 1}\x00"
text = re.sub(r'`([^`]+)`', save_inline_code, text)
# ... 处理其他 Markdown 语法 ...
# 11-12. Restore protected content with HTML tags
for i, code in enumerate(inline_codes):
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00IC{i}\x00", f"<code>{escaped}</code>")
5. 会话管理和上下文传播
为什么值得深挖:
SessionManager实现持久化会话存储ContextBuilder组装系统提示、历史记录、技能- 工具上下文动态设置(
message_tool.set_context())
关键代码 (nanobot/agent/loop.py:140-150):
# Update tool contexts
message_tool = self.tools.get("message")
if isinstance(message_tool, MessageTool):
message_tool.set_context(msg.channel, msg.chat_id)
spawn_tool = self.tools.get("spawn")
if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(msg.channel, msg.chat_id)
6. WebSocket 桥接模式
为什么值得深挖:
- WhatsApp 通过 WebSocket 连接到 Node.js 桥接服务
- 异步重连机制
- JSON 消息协议设计
关键代码 (nanobot/channels/whatsapp.py:31-64):
async def start(self) -> None:
import websockets
bridge_url = self.config.bridge_url
while self._running:
try:
async with websockets.connect(bridge_url) as ws:
self._ws = ws
self._connected = True
async for message in ws:
try:
await self._handle_bridge_message(message)
except Exception as e:
logger.error(f"Error handling bridge message: {e}")
except asyncio.CancelledError:
break
except Exception as e:
self._connected = False
if self._running:
await asyncio.sleep(5) # Reconnect delay
总结
nanobot 的扩展机制体现了以下设计原则:
- 声明式配置:技能以 Markdown 文件形式存在,易于维护
- 渐进式加载:区分 always skills 和 available skills,优化上下文
- 插件化架构:渠道、工具、提供商均可通过继承抽象类扩展
- 事件驱动:Cron、心跳、消息总线采用异步事件模型
- 持久化存储:Cron 任务、会话状态支持持久化
这种设计使 nanobot 既保持轻量级(4,000 行核心代码),又具备强大的扩展能力。