会话管理架构
Moltbot 的会话模型和上下文管理
概述
会话管理是 Moltbot 的核心状态管理系统,负责维护对话上下文、历史消息、Agent 状态等。支持会话键管理、上下文修剪、压缩和 Compaction。
设计目标
- 会话隔离: 不同对话使用独立的会话键
- 上下文管理: 智能的历史修剪和压缩
- 持久化: 会话状态持久化到文件
- 并发控制: 防止同一会话的并发执行
- Compaction: 定期压缩会话上下文
会话模型
Session Key 结构
文件: src/routing/session-key.ts
// 格式:agent:{agentId}[:mainKey|channel:peerKind:peerId]
// 示例:
agent:main: // 主会话
agent:main:custom // 自定义主键
agent:main:telegram:dm:123456789 // Telegram DM
agent:main:whatsapp:dm:+1234567890 // WhatsApp DM
agent:main:discord:group:123456 // Discord 群组
agent:work:telegram:dm:123456789 // Work Agent 的 Telegram DM
Session 状态
文件: src/gateway/server/ws-chat.ts
export type ChatRunState = {
runId: string; // 运行 ID
sessionKey: string; // 会话键
sessionId: string; // 会话 ID
agentId: string; // Agent ID
startedAt: number; // 开始时间
message?: string; // 触发消息
lane?: string; // 队列名
aborted?: boolean; // 是否中止
};
上下文管理
History 修剪
文件: src/agents/pi-embedded-runner/history.ts
export function limitHistoryTurns(
messages: Message[],
limit?: number,
modelContextWindow?: number
): Message[] {
// 1. 如果无限制,返回所有
if (!limit && !modelContextWindow) return messages;
// 2. 按 user 限制
if (limit) {
let userTurnCount = 0;
const result: Message[] = [];
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
result.unshift(msg);
if (msg.role === "user") {
userTurnCount++;
if (userTurnCount >= limit) break;
}
}
return result;
}
// 3. 按上下文窗口限制
if (modelContextWindow) {
// ... 估算 token 数并修剪
}
return messages;
}
DM History Limit
配置:
{
agents: {
defaults: {
dmHistoryLimit: 10, // 保留最近 10 个用户回合
// 也可以在渠道级别配置
},
list: {
work: {
dmHistoryLimit: 20
}
}
},
providers: {
anthropic: {
dmHistoryLimit: 5
}
}
}
优先级
- Agent 级:
agents[{id}].dmHistoryLimit - Provider 级:
providers[{provider}].dmHistoryLimit - 全局默认:
agents.defaults.dmHistoryLimit(默认 50)
Compaction
Compaction 策略
文件: src/agents/pi-embedded-runner/compact.js
export async function compactEmbeddedPiSession(
sessionFile: string,
opts: {
provider?: string;
model?: string;
reason?: string;
}
): Promise<EmbeddedPiCompactResult> {
// 1. 读取当前会话
const { sessionManager, session } = await readSession(sessionFile);
// 2. 生成摘要
const summary = await generateSummary({
session,
provider,
model,
});
// 3. 重置会话为摘要
await session.resetLeaf();
// 4. 注入摘要作为 system message
await session.append({
role: "user",
content: `<context_summary>\n${summary}\n</context_summary>`,
});
return {
success: true,
summary,
messageCount: messageCount,
};
}
自动 Compaction
触发条件:
手动 Compaction
WebSocket 方法:
// 调用 compact 方法
await gatewayCall({
method: "sessions.compact",
params: {
sessionKey: "agent:main",
reason: "Manual compaction"
}
});
Session 存储
存储路径
文件: src/config/sessions.ts
// 会话文件存储路径
export function resolveStorePath(
sessionKey: string,
workspaceDir?: string
): string {
const agentDir = resolveAgentDir(sessionKey, workspaceDir);
return path.join(agentDir, "sessions.jsonl");
}
// Agent 目录
export function resolveAgentDir(
sessionKey: string,
workspaceDir?: string
): string {
const [_, agentId] = sessionKey.split(":");
const resolvedWorkspaceDir = workspaceDir ?? resolveDefaultWorkspaceDir();
return path.join(resolvedWorkspaceDir, ".clawdbot", "agents", agentId);
}
Session 格式
{"type":"session","id":"root","parentId":null}
{"type":"message","sessionId":"root","role":"user","content":"Hello"}
{"type":"message","sessionId":"root","role":"assistant","content":"Hi there!"}
{"type":"branch","id":"branch-1","parentId":"root"}
{"type":"message","sessionId":"branch-1","role":"user","content":"Follow up"}
并发控制
Lane 机制
文件: src/agents/pi-embedded-runner/lanes.ts
// 防止同一 session 并发
const sessionLane = resolveSessionLane(params.sessionKey ?? params.sessionId);
// 全局资源限制
const globalLane = resolveGlobalLane(params.lane);
// 执行时排队
return enqueueSessionLane(() =>
enqueueGlobalLane(async () => {
// 实际执行
})
);
Lane 命名
// Session lane 格式:session:{key}
resolveSessionLane("agent:main")
// → "session:agent:main"
// Global lane 格式:可配置
resolveGlobalLane("main")
// → "main"
Session Patching
Patch 操作
WebSocket 方法:
// 更新会话属性
await gatewayCall({
method: "sessions.patch",
params: {
sessionKey: "agent:main",
patch: {
thinkingLevel: "high",
verboseLevel: true,
sendPolicy: "auto",
model: "anthropic/claude-opus-4-5"
}
}
});
Patchable 属性
| 属性 | 类型 | 说明 |
|---|---|---|
thinkingLevel | `"off" | "minimal" |
verboseLevel | boolean | 详细输出 |
sendPolicy | `"auto" | "never" |
model | string | 模型 ID |
groupActivation | `"mention" | "always"` |
Session 事件
事件类型
export type SessionEventType =
| "session_start"
| "session_end"
| "session_reset"
| "session_compact";
export type SessionEvent = {
type: SessionEventType;
sessionKey: string;
sessionId?: string;
timestamp: Date;
context?: Record<string, unknown>;
};
Hooks 集成
// 监听会话开始
registerInternalHook('session_start', async (event) => {
console.log('Session started:', event.sessionKey);
});
// 监听会话结束
registerInternalHook('session_end', async (event) => {
console.log('Session ended:', event.sessionKey);
});
代码路径引用
| 功能 | 文件路径 |
|---|---|
| Session Key 构建 | src/routing/session-key.ts |
| Session 存储 | src/config/sessions.ts |
| History 修剪 | src/agents/pi-embedded-runner/history.ts |
| Compaction | src/agents/pi-embedded-runner/compact.js |
| Lane 管理 | src/agents/pi-embedded-runner/lanes.ts |
| Chat Run State | src/gateway/server/ws-chat.ts |
| Session Events | src/hooks/internal-hooks.ts |
| Session Patching | src/gateway/server-methods.ts |