Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • 会话管理架构

会话管理架构

Moltbot 的会话模型和上下文管理

概述

会话管理是 Moltbot 的核心状态管理系统,负责维护对话上下文、历史消息、Agent 状态等。支持会话键管理、上下文修剪、压缩和 Compaction。

设计目标

  • 会话隔离: 不同对话使用独立的会话键
  • 上下文管理: 智能的历史修剪和压缩
  • 持久化: 会话状态持久化到文件
  • 并发控制: 防止同一会话的并发执行
  • Compaction: 定期压缩会话上下文

会话模型

Session Key 结构

文件: src/routing/session-key.ts

// 格式:agent:{agentId}[:mainKey|channel:peerKind:peerId]

// 示例:
agent:main:                           // 主会话
agent:main:custom                      // 自定义主键
agent:main:telegram:dm:123456789       // Telegram DM
agent:main:whatsapp:dm:+1234567890    // WhatsApp DM
agent:main:discord:group:123456      // Discord 群组
agent:work:telegram:dm:123456789        // Work Agent 的 Telegram DM

Session 状态

文件: src/gateway/server/ws-chat.ts

export type ChatRunState = {
  runId: string;              // 运行 ID
  sessionKey: string;          // 会话键
  sessionId: string;           // 会话 ID
  agentId: string;            // Agent ID
  startedAt: number;          // 开始时间
  message?: string;           // 触发消息
  lane?: string;              // 队列名
  aborted?: boolean;          // 是否中止
};

上下文管理

History 修剪

文件: src/agents/pi-embedded-runner/history.ts

export function limitHistoryTurns(
  messages: Message[],
  limit?: number,
  modelContextWindow?: number
): Message[] {
  // 1. 如果无限制,返回所有
  if (!limit && !modelContextWindow) return messages;

  // 2. 按 user 限制
  if (limit) {
    let userTurnCount = 0;
    const result: Message[] = [];
    for (let i = messages.length - 1; i >= 0; i--) {
      const msg = messages[i];
      result.unshift(msg);
      if (msg.role === "user") {
        userTurnCount++;
        if (userTurnCount >= limit) break;
      }
    }
    return result;
  }

  // 3. 按上下文窗口限制
  if (modelContextWindow) {
    // ... 估算 token 数并修剪
  }

  return messages;
}

DM History Limit

配置:

{
  agents: {
    defaults: {
      dmHistoryLimit: 10,  // 保留最近 10 个用户回合
      // 也可以在渠道级别配置
    },
    list: {
      work: {
        dmHistoryLimit: 20
      }
    }
  },
  providers: {
    anthropic: {
      dmHistoryLimit: 5
    }
  }
}

优先级

  1. Agent 级: agents[{id}].dmHistoryLimit
  2. Provider 级: providers[{provider}].dmHistoryLimit
  3. 全局默认: agents.defaults.dmHistoryLimit (默认 50)

Compaction

Compaction 策略

文件: src/agents/pi-embedded-runner/compact.js

export async function compactEmbeddedPiSession(
  sessionFile: string,
  opts: {
    provider?: string;
    model?: string;
    reason?: string;
  }
): Promise<EmbeddedPiCompactResult> {
  // 1. 读取当前会话
  const { sessionManager, session } = await readSession(sessionFile);

  // 2. 生成摘要
  const summary = await generateSummary({
    session,
    provider,
    model,
  });

  // 3. 重置会话为摘要
  await session.resetLeaf();

  // 4. 注入摘要作为 system message
  await session.append({
    role: "user",
    content: `<context_summary>\n${summary}\n</context_summary>`,
  });

  return {
    success: true,
    summary,
    messageCount: messageCount,
  };
}

自动 Compaction

触发条件:

手动 Compaction

WebSocket 方法:

// 调用 compact 方法
await gatewayCall({
  method: "sessions.compact",
  params: {
    sessionKey: "agent:main",
    reason: "Manual compaction"
  }
});

Session 存储

存储路径

文件: src/config/sessions.ts

// 会话文件存储路径
export function resolveStorePath(
  sessionKey: string,
  workspaceDir?: string
): string {
  const agentDir = resolveAgentDir(sessionKey, workspaceDir);
  return path.join(agentDir, "sessions.jsonl");
}

// Agent 目录
export function resolveAgentDir(
  sessionKey: string,
  workspaceDir?: string
): string {
  const [_, agentId] = sessionKey.split(":");
  const resolvedWorkspaceDir = workspaceDir ?? resolveDefaultWorkspaceDir();
  return path.join(resolvedWorkspaceDir, ".clawdbot", "agents", agentId);
}

Session 格式

{"type":"session","id":"root","parentId":null}
{"type":"message","sessionId":"root","role":"user","content":"Hello"}
{"type":"message","sessionId":"root","role":"assistant","content":"Hi there!"}
{"type":"branch","id":"branch-1","parentId":"root"}
{"type":"message","sessionId":"branch-1","role":"user","content":"Follow up"}

并发控制

Lane 机制

文件: src/agents/pi-embedded-runner/lanes.ts

// 防止同一 session 并发
const sessionLane = resolveSessionLane(params.sessionKey ?? params.sessionId);

// 全局资源限制
const globalLane = resolveGlobalLane(params.lane);

// 执行时排队
return enqueueSessionLane(() =>
  enqueueGlobalLane(async () => {
    // 实际执行
  })
);

Lane 命名

// Session lane 格式:session:{key}
resolveSessionLane("agent:main")
// → "session:agent:main"

// Global lane 格式:可配置
resolveGlobalLane("main")
// → "main"

Session Patching

Patch 操作

WebSocket 方法:

// 更新会话属性
await gatewayCall({
  method: "sessions.patch",
  params: {
    sessionKey: "agent:main",
    patch: {
      thinkingLevel: "high",
      verboseLevel: true,
      sendPolicy: "auto",
      model: "anthropic/claude-opus-4-5"
    }
  }
});

Patchable 属性

属性类型说明
thinkingLevel`"off""minimal"
verboseLevelboolean详细输出
sendPolicy`"auto""never"
modelstring模型 ID
groupActivation`"mention""always"`

Session 事件

事件类型

export type SessionEventType =
  | "session_start"
  | "session_end"
  | "session_reset"
  | "session_compact";

export type SessionEvent = {
  type: SessionEventType;
  sessionKey: string;
  sessionId?: string;
  timestamp: Date;
  context?: Record<string, unknown>;
};

Hooks 集成

// 监听会话开始
registerInternalHook('session_start', async (event) => {
  console.log('Session started:', event.sessionKey);
});

// 监听会话结束
registerInternalHook('session_end', async (event) => {
  console.log('Session ended:', event.sessionKey);
});

代码路径引用

功能文件路径
Session Key 构建src/routing/session-key.ts
Session 存储src/config/sessions.ts
History 修剪src/agents/pi-embedded-runner/history.ts
Compactionsrc/agents/pi-embedded-runner/compact.js
Lane 管理src/agents/pi-embedded-runner/lanes.ts
Chat Run Statesrc/gateway/server/ws-chat.ts
Session Eventssrc/hooks/internal-hooks.ts
Session Patchingsrc/gateway/server-methods.ts