Bridge(远程控制)系统深度分析
基于
/src/bridge/(31 个文件,12,613 行)和/src/remote/(4 个文件)的完整代码审计。
一、系统总览
Bridge 系统是 Claude Code 的 远程控制(Remote Control) 子系统。它让用户能从 claude.ai/web 或 Claude 移动端连接到本地运行的 CLI 会话——本质上,你的笔记本变成了一个"云工作器",通过轮询一个远程 API 获取工作项(session),然后在本地 spawn 子 CLI 进程来执行实际的代码编辑任务。
核心价值主张:用户离开终端后仍可通过浏览器继续 coding session,实现跨设备连续工作。
关键术语
| 术语 | 含义 |
|---|---|
| Environment | 一台注册到服务器的本地机器(bridge 进程实例),可管理多个 session |
| Work Item | 服务器下发的工作单元,携带 session ID + work secret |
| Work Secret | base64url 编码的 JSON,包含 session ingress JWT + API base URL |
| Session | 一个子 CLI 进程实例,处理具体的 coding 任务 |
| Session Ingress | 服务器端接收子进程事件的 WebSocket/HTTP 端点 |
| CCR | Claude Code Runtime — 远程 session 执行的后端基础设施 |
| v1/v2 | 两种协议版本:v1 = Session-Ingress WebSocket;v2 = SSE + CCRClient |
二、架构总览:Hub-and-Spoke 模式
┌──────────────────────────┐
│ claude.ai / Web Client │
└────────────┬─────────────┘
│ WebSocket + HTTP POST
┌────────────▼─────────────┐
│ CCR Server (cloud infra) │
│ - Environment Registry │
│ - Work Queue (Redis PEL) │
│ - Session Ingress │
└────────────┬─────────────┘
│
┌──────────────────▼──────────────────┐
│ bridgeMain.ts (Orchestrator) │
│ ┌─────────────────────────────────┐ │
│ │ Poll Loop (while !aborted) │ │
│ │ ├─ pollForWork() │ │
│ │ ├─ decodeWorkSecret() │ │
│ │ ├─ registerWorker() (v2 only) │ │
│ │ ├─ spawn(sessionRunner) │ │
│ │ ├─ heartbeatActiveWorkItems() │ │
│ │ └─ capacityWake / backoff │ │
│ └─────────────────────────────────┘ │
│ │ │ │ │
│ ┌────▼───┐ ┌────▼───┐ ┌────▼───┐ │
│ │ Session │ │ Session│ │ Session│ │
│ │ Child 1 │ │ Child 2│ │ Child N│ │
│ │ (stdin/ │ │ │ │ │ │
│ │ stdout) │ │ │ │ │ │
│ └─────────┘ └────────┘ └────────┘ │
└────────────────────────────────────────┘
另有一条独立路径:
┌──────────────────────────────────┐
│ replBridge.ts (In-process mode) │
│ 嵌入 REPL 内部,共享进程 │
│ 通过 ReplBridgeTransport 通信 │
└──────────────────────────────────┘
bridgeMain.ts 是整个子系统的 orchestrator(2,999 行),负责:
- 注册 environment → 获取 environment_id + environment_secret
- 轮询工作队列(poll loop)
- 解码 work secret → 获取 session ingress JWT
- 为每个 session spawn 子 CLI 进程
- 管理多会话生命周期(最多 32 个并发)
- 错误恢复 + 指数退避
- 优雅关闭
三、Environment Worker 注册与轮询
3.1 注册流程
bridgeMain.ts:2447-2467
const reg = await api.registerBridgeEnvironment(config)
// → POST /v1/environments/bridge
// 发送: machine_name, directory, branch, git_repo_url, max_sessions, metadata
// 接收: environment_id, environment_secret
注册时发送的 BridgeConfig(types.ts:81-115)包含:
dir: 当前工作目录machineName: hostname()branch,gitRepoUrl: Git 上下文maxSessions: 最大并发数(默认 32)spawnMode:'single-session' | 'worktree' | 'same-dir'bridgeId: 客户端 UUIDworkerType:'claude_code'或'claude_code_assistant'reuseEnvironmentId: 恢复旧 session 时复用已有的 environment ID
bridgeApi.ts:76-89 的 getHeaders() 会在每个请求中附带:
Authorization: Bearer <OAuth token>— 用于认证anthropic-beta: environments-2025-11-01— API 版本头X-Trusted-Device-Token— 受信设备令牌(如果 gate 开启)
3.2 轮询循环
bridgeMain.ts:600-1401 是核心的 while (!loopSignal.aborted) 循环:
┌─────────────────────────────────────────────────────────┐
│ Poll Loop Tick │
│ │
│ 1. pollForWork(envId, envSecret, signal) │
│ → GET /v1/environments/{id}/work/poll │
│ │
│ 2. if !work: │
│ ├─ at capacity? → heartbeat loop / slow poll │
│ └─ not at capacity? → sleep(partial/full interval) │
│ │
│ 3. if work: │
│ ├─ skip if completedWorkIds.has(work.id) │
│ ├─ decodeWorkSecret(work.secret) │
│ ├─ ackWork() → POST .../work/{id}/ack │
│ ├─ healthcheck? → log + continue │
│ └─ session? → │
│ ├─ existing session? → updateAccessToken() │
│ ├─ at capacity? → break (don't spawn) │
│ ├─ v2? → registerWorker() → get epoch │
│ ├─ worktree? → createAgentWorktree() │
│ ├─ spawn(sessionRunner) → SessionHandle │
│ └─ schedule timeout + token refresh │
│ │
│ 4. on error: │
│ ├─ BridgeFatalError → break (401/403/404/410) │
│ ├─ connection error → connBackoff (2s→4s→...→2min) │
│ ├─ general error → generalBackoff (500ms→...→30s) │
│ └─ give up after 10 minutes of errors │
│ │
│ 5. sleep detection: gap > 2×backoffCap → reset budget │
└─────────────────────────────────────────────────────────┘
轮询间隔配置(pollConfig.ts,通过 GrowthBook 动态下发):
| 参数 | 默认值 | 含义 |
|---|---|---|
poll_interval_ms_not_at_capacity | — | 有空余时的轮询间隔 |
multisession_poll_interval_ms_not_at_capacity | — | 多会话模式未满时 |
multisession_poll_interval_ms_partial_capacity | — | 部分容量时 |
multisession_poll_interval_ms_at_capacity | — | 满容量时慢速轮询 |
non_exclusive_heartbeat_interval_ms | — | 心跳间隔(与轮询并行) |
reclaim_older_than_ms | 5000 | XAUTOCLAIM 回收超时阈值 |
四、Session Spawning 模式
4.1 三种 Spawn 模式
types.ts:69:
export type SpawnMode = 'single-session' | 'worktree' | 'same-dir'
| 模式 | 行为 | 并发上限 |
|---|---|---|
| single-session | 一个 session 在 cwd,结束后 bridge 退出 | 1 |
| same-dir | 持久 server,所有 session 共享 cwd | 32(默认) |
| worktree | 持久 server,每个 session 获得独立 git worktree | 32(默认) |
worktree 模式(bridgeMain.ts:963-1015):
- 调用
createAgentWorktree()为每个 on-demand session 创建隔离的 git worktree - 初始预创建的 session 例外——它在
config.dir中运行(匹配旧 UX) - session 结束后调用
removeAgentWorktree()清理
4.2 子进程 Spawn
sessionRunner.ts:248-548 的 createSessionSpawner():
const args = [
...deps.scriptArgs, // npm 模式下需要 script path
'--print',
'--sdk-url', opts.sdkUrl, // Session-Ingress WebSocket URL
'--session-id', opts.sessionId,
'--input-format', 'stream-json',
'--output-format', 'stream-json',
'--replay-user-messages', // 回放用户消息用于标题提取
'--verbose', '--debug-file', '--permission-mode' // 可选
]
const env = {
...deps.env,
CLAUDE_CODE_OAUTH_TOKEN: undefined, // 清除 bridge 的 OAuth
CLAUDE_CODE_ENVIRONMENT_KIND: 'bridge',
CLAUDE_CODE_SESSION_ACCESS_TOKEN: opts.accessToken, // session ingress JWT
CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2: '1', // v1: HybridTransport
// v2: SSE + CCRClient
...(opts.useCcrV2 && {
CLAUDE_CODE_USE_CCR_V2: '1',
CLAUDE_CODE_WORKER_EPOCH: String(opts.workerEpoch),
}),
}
const child = spawn(deps.execPath, args, {
cwd: dir,
stdio: ['pipe', 'pipe', 'pipe'], // stdin/stdout/stderr 全部管道
env,
windowsHide: true,
})
子进程三根管道的用途:
- stdin: 接收
update_environment_variables消息(token 刷新)和 control 命令 - stdout: 解析 NDJSON 活动流(
extractActivities提取 tool_use + result 事件) - stderr: 环形缓冲最近 10 行用于错误诊断
4.3 SessionHandle 接口
types.ts:178-190:
export type SessionHandle = {
sessionId: string
done: Promise<SessionDoneStatus> // 'completed' | 'failed' | 'interrupted'
kill(): void // SIGTERM
forceKill(): void // SIGKILL
activities: SessionActivity[] // 最近 10 条活动(环形缓冲)
currentActivity: SessionActivity | null
accessToken: string
lastStderr: string[]
writeStdin(data: string): void
updateAccessToken(token: string): void
}
sessionRunner.ts:482-543 实现的 updateAccessToken():
handle.writeStdin(jsonStringify({
type: 'update_environment_variables',
variables: { CLAUDE_CODE_SESSION_ACCESS_TOKEN: token },
}) + '\n')
子进程的 StructuredIO 收到此消息后直接设置 process.env,后续的 WebSocket 连接自动使用新 token。
五、多会话管理
5.1 并发控制
bridgeMain.ts:83: const SPAWN_SESSIONS_DEFAULT = 32
多会话由 GrowthBook gate tengu_ccr_bridge_multi_session 控制(bridgeMain.ts:96-98)。
核心数据结构(bridgeMain.ts:163-187):
const activeSessions = new Map<string, SessionHandle>()
const sessionStartTimes = new Map<string, number>()
const sessionWorkIds = new Map<string, string>()
const sessionCompatIds = new Map<string, string>()
const sessionIngressTokens = new Map<string, string>()
const sessionTimers = new Map<string, ReturnType<typeof setTimeout>>()
const completedWorkIds = new Set<string>()
const sessionWorktrees = new Map<string, { worktreePath, worktreeBranch, gitRoot, hookBased }>()
const timedOutSessions = new Set<string>()
const titledSessions = new Set<string>()
const v2Sessions = new Set<string>()
5.2 满容量时的行为
当 activeSessions.size >= config.maxSessions(bridgeMain.ts:639-737):
心跳模式:如果
non_exclusive_heartbeat_interval_ms > 0,进入心跳子循环,周期性 heartbeat 所有活跃 session,直到:- Poll 到期(atCapMs 超时)
- Session 完成(capacityWake 信号)
- Auth 失败(JWT 过期 → reconnectSession)
- 关闭信号
慢轮询模式:如果心跳禁用,用
atCapMs间隔慢速轮询作为存活信号Capacity Wake(见下文):session 完成时立即唤醒轮询
5.3 Session 完成处理
bridgeMain.ts:442-591 的 onSessionDone() 回调:
onSessionDone →
1. 清理所有 Map 条目
2. 清除 timeout timer + token refresh timer
3. capacityWake.wake() → 唤醒 at-capacity 睡眠
4. 如果 timeout 导致 → 将 'interrupted' 映射为 'failed'
5. stopWorkWithRetry() → 告知服务器 work 结束
6. 清理 worktree(如果有)
7. 单会话模式 → controller.abort() 退出 poll loop
8. 多会话模式 → archiveSession() 后继续接受新 work
六、JWT Token 生命周期
6.1 Token 来源
Session ingress JWT 通过 work secret 传递:
// workSecret.ts:6-32
const secret = decodeWorkSecret(work.secret)
// secret.session_ingress_token — JWT,用于 API 认证
// secret.api_base_url — API 基础 URL
// secret.use_code_sessions — 是否使用 v2 协议
6.2 Proactive Token Refresh
jwtUtils.ts:72-255 的 createTokenRefreshScheduler():
┌──────────────────────────────────────────────────┐
│ Token Refresh Scheduler │
│ │
│ schedule(sessionId, jwt) → │
│ 1. decodeJwtExpiry(jwt) → exp claim │
│ 2. delayMs = exp - now - 5min buffer │
│ 3. setTimeout(doRefresh, delayMs) │
│ │
│ doRefresh(sessionId, gen) → │
│ 1. oauthToken = await getAccessToken() │
│ 2. if v1: handle.updateAccessToken(oauthToken) │
│ if v2: api.reconnectSession() → re-dispatch │
│ 3. schedule follow-up in 30min │
│ │
│ 失败策略: │
│ - 连续失败 ≤ 3 次: 60s 后重试 │
│ - 连续失败 > 3 次: 放弃刷新 │
│ │
│ Generation counter: │
│ - schedule() 递增 → 失效 in-flight doRefresh │
│ - cancel() 递增 → 同上 │
└──────────────────────────────────────────────────┘
v1 vs v2 的关键区别:
- v1:
onRefresh调用handle.updateAccessToken(oauthToken)→ 通过 stdin 发送给子进程 - v2:
onRefresh调用api.reconnectSession()→ 服务器重新派发 work → 下次 poll 携带新 JWT
这是因为 v2 的 CCR worker 端点验证 JWT 的 session_id claim(register_worker.go:32),OAuth token 没有此 claim。
6.3 JWT 解码
jwtUtils.ts:21-48:
export function decodeJwtPayload(token: string): unknown | null {
const jwt = token.startsWith('sk-ant-si-')
? token.slice('sk-ant-si-'.length) // 去掉 session-ingress 前缀
: token
const parts = jwt.split('.')
if (parts.length !== 3 || !parts[1]) return null
return jsonParse(Buffer.from(parts[1], 'base64url').toString('utf8'))
}
七、受信设备安全层(Trusted Device)
trustedDevice.ts:1-210:
┌──────────────────────────────────────────────────────────┐
│ Trusted Device Token Flow │
│ │
│ Enrollment (POST /auth/trusted_devices): │
│ - 只在 fresh /login 后 10 分钟内有效 │
│ - 必须在 /login hook 中调用 enrollTrustedDevice() │
│ - 服务端返回 device_token → 存入 keychain │
│ - Token 90 天滚动过期 │
│ │
│ Usage: │
│ - getHeaders() 检查 GrowthBook gate │
│ tengu_sessions_elevated_auth_enforcement │
│ - 如果 gate 开启 → 读取 keychain → X-Trusted-Device-Token │
│ - 如果 gate 关闭 → header 不发送 │
│ │
│ Security tier: │
│ - Bridge session 在服务端标记为 SecurityTier=ELEVATED │
│ - 服务端 ConnectBridgeWorker 检查 trusted device │
│ - 双 flag 设计: CLI 侧先开启(header 开始流动), │
│ 然后服务端开启(开始强制检查) │
│ │
│ Priority: │
│ 环境变量 CLAUDE_TRUSTED_DEVICE_TOKEN > keychain │
│ 企业 wrapper 可以通过 env var 提供自己的 token │
└──────────────────────────────────────────────────────────┘
关键函数:
getTrustedDeviceToken()(trustedDevice.ts:54-59): gate + memoized 读取enrollTrustedDevice()(trustedDevice.ts:98-210): 注册流程,best-effortclearTrustedDeviceToken()(trustedDevice.ts:72-87): 登出时清除clearTrustedDeviceTokenCache()(trustedDevice.ts:61-63): 清除 memo 缓存
八、消息路由与 UUID 去重
8.1 BoundedUUIDSet(环形缓冲去重)
bridgeMessaging.ts:429-461:
export class BoundedUUIDSet {
private readonly ring: (string | undefined)[]
private readonly set = new Set<string>()
private writeIdx = 0
add(uuid: string): void {
if (this.set.has(uuid)) return
const evicted = this.ring[this.writeIdx]
if (evicted !== undefined) this.set.delete(evicted)
this.ring[this.writeIdx] = uuid
this.set.add(uuid)
this.writeIdx = (this.writeIdx + 1) % this.capacity
}
}
使用两个实例:
recentPostedUUIDs: 我方发出的消息 UUID → 过滤 echorecentInboundUUIDs: 已转发的入站消息 UUID → 防止重复派发
8.2 Ingress 消息路由
bridgeMessaging.ts:132-208 的 handleIngressMessage():
收到 WS data → jsonParse → normalizeControlMessageKeys
├─ isSDKControlResponse? → onPermissionResponse (权限响应)
├─ isSDKControlRequest? → onControlRequest (initialize/set_model/interrupt)
├─ isSDKMessage? →
│ ├─ uuid in recentPostedUUIDs? → 跳过 (echo)
│ ├─ uuid in recentInboundUUIDs? → 跳过 (重复派发)
│ └─ type === 'user'? → onInboundMessage (转发给子进程)
└─ 其他 → 跳过
8.3 Server Control Request 处理
bridgeMessaging.ts:243-391 的 handleServerControlRequest():
处理来自服务端的 control_request:
initialize: 回复最小 capabilities(commands[], models[], pid)set_model: 触发 onSetModel 回调set_max_thinking_tokens: 触发 onSetMaxThinkingTokens 回调set_permission_mode: 策略裁决(ok/error)interrupt: 触发 onInterrupt 回调- 超时约束:必须在 10-14 秒内响应,否则服务端杀死 WS
Outbound-only 模式:所有 mutating 请求回复 OUTBOUND_ONLY_ERROR(bridgeMessaging.ts:231-232),只有 initialize 仍然回复 success(否则服务端会断开连接)。
九、Work Secret 解码
workSecret.ts:6-32:
export function decodeWorkSecret(secret: string): WorkSecret {
const json = Buffer.from(secret, 'base64url').toString('utf-8')
const parsed = jsonParse(json)
// 校验 version === 1
// 校验 session_ingress_token 非空
// 校验 api_base_url 存在
return parsed as WorkSecret
}
WorkSecret 结构(types.ts:33-51):
export type WorkSecret = {
version: number
session_ingress_token: string // JWT
api_base_url: string
sources: Array<{ type, git_info? }>
auth: Array<{ type, token }>
claude_code_args?: Record<string, string>
mcp_config?: unknown
environment_variables?: Record<string, string>
use_code_sessions?: boolean // v2 协议选择器
}
use_code_sessions 字段:服务端通过 prepare_work_secret() 设置,决定使用 v1(Session-Ingress WebSocket)还是 v2(SSE + CCRClient)。环境变量 CLAUDE_BRIDGE_USE_CCR_V2 可以覆盖(仅 ant 开发用)。
SDK URL 构建
workSecret.ts:41-48(v1):
// localhost → ws://host/v2/session_ingress/ws/{id}
// production → wss://host/v1/session_ingress/ws/{id} (Envoy rewrite v1→v2)
workSecret.ts:81-87(v2):
// → https://host/v1/code/sessions/{id}
Session ID 兼容性
sessionIdCompat.ts:38-56:
CCR v2 的 infra 层使用 cse_* 前缀,但 compat API 需要 session_* 前缀。toCompatSessionId() 和 toInfraSessionId() 做互相转换,底层 UUID 相同。
sameSessionId()(workSecret.ts:62-73)比较两个不同前缀的 session ID:
const aBody = a.slice(a.lastIndexOf('_') + 1) // 取最后一个 _ 之后的部分
return aBody.length >= 4 && aBody === bBody
十、协议版本:v1 vs v2
10.1 v1: HybridTransport
- 读取: WebSocket 连接到 Session-Ingress
- 写入: HTTP POST 到 Session-Ingress
- 认证: OAuth token(通过
CLAUDE_CODE_SESSION_ACCESS_TOKENenv var) - 重放: 服务端通过 message cursor 处理,不需要 SSE sequence number
10.2 v2: SSETransport + CCRClient
replBridgeTransport.ts:119-369 的 createV2ReplTransport():
┌────────────────────────────────────────────────────────┐
│ v2 Transport Stack │
│ │
│ SSETransport (读取): │
│ URL: {sessionUrl}/worker/events/stream │
│ - 支持 from_sequence_num / Last-Event-ID │
│ - 重连预算耗尽 → onClose(undefined) → 4092 │
│ │
│ CCRClient (写入): │
│ - SerialBatchEventUploader (maxBatchSize=100) │
│ - POST /worker/events │
│ - Heartbeat: PUT /worker/state (每 20s) │
│ - Delivery tracking: POST /worker/events/{id}/delivery│
│ │
│ Worker Registration: │
│ - POST {sessionUrl}/worker/register │
│ - 返回 worker_epoch (单调递增) │
│ - epoch mismatch (409) → 关闭 + poll loop recovery │
│ │
│ Auth: JWT(session_id claim),非 OAuth token │
│ │
│ Epoch Mismatch Handling: │
│ - CCRClient 收到 409 → onEpochMismatch 回调 │
│ - 关闭 CCR + SSE → onClose(4090) │
│ - replBridge 的 onClose → 回到 poll loop │
│ - 下次 poll 派发新 epoch → 重建 transport │
└────────────────────────────────────────────────────────┘
SSE Sequence Number 传递(replBridgeTransport.ts:128-131):
initialSequenceNum?: number
// 传递给 SSETransport,connect() 时发送 from_sequence_num
// 避免每次 transport swap 重放整个 session 历史
Delivery ACK 优化(replBridgeTransport.ts:249-252):
sse.setOnEvent(event => {
ccr.reportDelivery(event.event_id, 'received')
ccr.reportDelivery(event.event_id, 'processed') // 立即标记 processed
})
原来的实现只标记 received,导致 daemon 重启后 21-25 个 phantom prompt 被重放。立即标记 processed 的 trade-off:崩溃窗口内丢失一个 prompt vs 每次重启重放 N 个。
十一、远程 Session 客户端
11.1 RemoteSessionManager
RemoteSessionManager.ts:95-343:
┌──────────────────────────────────────────────────┐
│ RemoteSessionManager │
│ │
│ 职责: │
│ 1. WebSocket 订阅(接收消息流) │
│ 2. HTTP POST 发送用户消息 │
│ 3. Permission request/response 流程 │
│ │
│ connect() → │
│ new SessionsWebSocket(sessionId, orgUuid, ...) │
│ ws.connect() │
│ │
│ handleMessage() → │
│ ├─ control_request → handleControlRequest │
│ │ └─ can_use_tool → callbacks.onPermissionRequest │
│ ├─ control_cancel_request → onPermissionCancelled │
│ ├─ control_response → 跳过(ACK) │
│ └─ SDKMessage → callbacks.onMessage │
│ │
│ respondToPermissionRequest() → │
│ 构建 SDKControlResponse │
│ → ws.sendControlResponse() │
│ │
│ cancelSession() → │
│ ws.sendControlRequest({ subtype: 'interrupt' }) │
└──────────────────────────────────────────────────┘
11.2 SessionsWebSocket — 连接弹性
SessionsWebSocket.ts:82-404:
连接策略:
URL: wss://api.anthropic.com/v1/sessions/ws/{id}/subscribe?organization_uuid=...
Auth: Bearer <OAuth token> (via headers, 非 auth message)
重连机制:
- 常规断开: 最多重连 5 次,每次间隔 2 秒
- 4001 (session not found): 最多重试 3 次(compaction 期间可能瞬时)
- 4003 (unauthorized): 永不重连
- 重连时 reset 计数器: reconnectAttempts = 0, sessionNotFoundRetries = 0
心跳: 每 30 秒 ping (ws.ping())
Runtime 兼容:
- Bun: globalThis.WebSocket + headers/proxy/tls options
- Node: ws package + agent/tls options
11.3 Permission Bridge — 合成消息
remotePermissionBridge.ts:12-78:
远程 CCR 容器发来的 permission request 需要转换成本地权限 UI 能理解的格式:
createSyntheticAssistantMessage(request, requestId) → AssistantMessage
// 构造一个假的 AssistantMessage,包含 tool_use block
// 因为 ToolUseConfirm 类型需要 AssistantMessage
// 但实际的 tool 在远程容器执行
createToolStub(toolName) → Tool
// 当远程有本地不存在的工具(如 MCP tools)
// 创建一个最小 stub,路由到 FallbackPermissionRequest
11.4 SDK Message Adapter
sdkMessageAdapter.ts:168-278:
将 CCR 发来的 SDKMessage 转换为 REPL 内部的 Message 类型:
| SDK Type | REPL Type | 说明 |
|---|---|---|
assistant | AssistantMessage | 直接映射 |
user | ignored / UserMessage | CCR 模式忽略;direct-connect 模式转换 tool_result |
stream_event | StreamEvent | 流式事件 |
result (非 success) | SystemMessage | 错误才显示 |
system (init) | SystemMessage | "Remote session initialized" |
system (status) | SystemMessage | "Compacting conversation..." |
system (compact_boundary) | SystemMessage (compact_boundary) | 带 compact metadata |
tool_progress | SystemMessage | "Tool X running for Ys" |
auth_status | ignored | SDK-only |
tool_use_summary | ignored | SDK-only |
rate_limit_event | ignored | SDK-only |
十二、错误处理与退避策略
12.1 退避配置
bridgeMain.ts:59-79:
const DEFAULT_BACKOFF: BackoffConfig = {
connInitialMs: 2_000, // 连接错误初始延迟
connCapMs: 120_000, // 连接错误最大延迟 (2min)
connGiveUpMs: 600_000, // 连接错误放弃阈值 (10min)
generalInitialMs: 500, // 通用错误初始延迟
generalCapMs: 30_000, // 通用错误最大延迟 (30s)
generalGiveUpMs: 600_000, // 通用错误放弃阈值 (10min)
shutdownGraceMs: 30_000, // SIGTERM→SIGKILL 宽限期
stopWorkBaseDelayMs: 1000, // stopWork 重试基数
}
12.2 错误分类
bridgeMain.ts:1582-1612:
// 连接错误: ECONNREFUSED, ECONNRESET, ETIMEDOUT, ENETUNREACH, EHOSTUNREACH
function isConnectionError(err): boolean { ... }
// 服务器错误: ERR_BAD_RESPONSE (HTTP 5xx)
function isServerError(err): boolean { ... }
// Fatal 错误 (BridgeFatalError):
// 401 → 认证失败
// 403 → 权限拒绝 / environment 过期
// 404 → environment 不存在
// 410 → environment 已过期
12.3 Jitter
bridgeMain.ts:1615-1616:
function addJitter(ms: number): number {
return Math.max(0, ms + ms * 0.25 * (2 * Math.random() - 1)) // ±25%
}
12.4 系统休眠检测
bridgeMain.ts:107-109:
function pollSleepDetectionThresholdMs(backoff: BackoffConfig): number {
return backoff.connCapMs * 2 // 2 × 2min = 4min
}
如果两次 poll 错误之间的时间间隔 > 4 分钟,说明系统可能休眠了,重置错误预算(backoff 归零,重新开始计时)。
12.5 stopWorkWithRetry
bridgeMain.ts:1627-1676:
指数退避重试 3 次(1s/2s/4s),确保服务器知道 work item 已结束,防止服务端僵尸。
Fatal 错误(401/403)不重试——认证问题不会自愈。
十三、Capacity Wake 系统
capacityWake.ts:1-56:
这是一个双信号合并器,让 at-capacity 睡眠可以被两种事件提前唤醒:
export function createCapacityWake(outerSignal: AbortSignal): CapacityWake {
let wakeController = new AbortController()
function wake(): void {
wakeController.abort() // 唤醒当前睡眠
wakeController = new AbortController() // 为下次睡眠准备新 controller
}
function signal(): CapacitySignal {
const merged = new AbortController()
// 同时监听 outerSignal(shutdown)和 wakeController.signal(session 完成)
outerSignal.addEventListener('abort', abort, { once: true })
wakeController.signal.addEventListener('abort', abort, { once: true })
return { signal: merged.signal, cleanup: ... }
}
}
使用位置:
bridgeMain.ts(standalone bridge):poll loop 的 at-capacity 睡眠replBridge.ts(REPL bridge):同样用于 poll loop
关键行为:onSessionDone 中调用 capacityWake.wake()(bridgeMain.ts:467),确保 session 结束后 bridge 立即 poll 新工作,而不是等到下次 poll interval。
十四、FlushGate — 初始刷新门
flushGate.ts:1-71:
Bridge session 启动时,历史消息通过单个 HTTP POST 刷新到服务器。在此期间,新消息必须排队,防止历史消息和新消息交错到达服务器。
start() → enqueue() 返回 true, 消息排队
end() → 返回排队的消息, enqueue() 返回 false
drop() → 丢弃所有排队消息 (永久关闭)
deactivate() → 清除 active 标志但不丢弃 (transport 替换)
十五、关键 API 端点
bridgeApi.ts 定义的所有 API 调用:
| 方法 | 端点 | 用途 |
|---|---|---|
registerBridgeEnvironment | POST /v1/environments/bridge | 注册 environment |
pollForWork | GET /v1/environments/{id}/work/poll | 轮询工作项 |
acknowledgeWork | POST /v1/environments/{id}/work/{id}/ack | 确认工作项 |
stopWork | POST /v1/environments/{id}/work/{id}/stop | 停止工作项 |
deregisterEnvironment | DELETE /v1/environments/bridge/{id} | 注销 environment |
archiveSession | POST /v1/sessions/{id}/archive | 归档 session |
reconnectSession | POST /v1/environments/{id}/bridge/reconnect | 重新连接 session |
heartbeatWork | POST /v1/environments/{id}/work/{id}/heartbeat | 心跳保活 |
sendPermissionResponseEvent | POST /v1/sessions/{id}/events | 发送权限响应 |
所有端点使用 environments-2025-11-01 beta header。
认证
- Poll/Stop/Deregister: OAuth token(通过
withOAuthRetry包装,401 时自动刷新) - Ack/Heartbeat: Session ingress JWT(从 work secret 解码)
- Trusted Device:
X-Trusted-Device-Tokenheader(可选,gate 控制)
十六、Headless Bridge(Daemon Worker)
bridgeMain.ts:2770-2965 的 runBridgeHeadless():
非交互式 bridge 入口,供 daemon worker 调用。与 bridgeMain() 的区别:
- 没有 readline 对话框
- 没有 stdin 键盘处理
- 没有 TUI
- 没有
process.exit()(由调用者控制) - 配置来自 caller(daemon.json),auth 通过 IPC(supervisor 的 AuthManager)
- 抛出
BridgeHeadlessPermanentError让 supervisor 区分永久/瞬态错误
十七、Feature Gate 矩阵
bridgeEnabled.ts 和相关文件中的 gate:
| Gate | 用途 |
|---|---|
feature('BRIDGE_MODE') | 编译时:bridge 代码是否包含在 build 中 |
tengu_ccr_bridge | 运行时:bridge 功能是否对用户可用 |
tengu_ccr_bridge_multi_session | 多会话 spawn 模式 |
tengu_bridge_repl_v2 | env-less (v2) REPL bridge 路径 |
tengu_bridge_repl_v2_cse_shim_enabled | cse_* → session_* 转换 |
tengu_sessions_elevated_auth_enforcement | 受信设备 token |
tengu_bridge_poll_interval_config | 轮询间隔动态配置 |
tengu_bridge_min_version | 最低 CLI 版本要求 |
tengu_cobalt_harbor | CCR auto-connect |
tengu_ccr_mirror | CCR mirror mode |
feature('KAIROS') | --session-id / --continue 恢复功能 |
feature('CCR_AUTO_CONNECT') | 自动连接 CCR |
feature('CCR_MIRROR') | Mirror mode |
十八、Mermaid 序列图
18.1 完整 Session 生命周期
18.2 WebSocket 订阅(客户端侧)
18.3 Token Refresh 时序
十九、关键文件索引
| 文件 | 行数 | 职责 |
|---|---|---|
bridgeMain.ts | 2,999 | Orchestrator: poll loop, session lifecycle, shutdown |
replBridge.ts | 2,406 | In-process REPL bridge (嵌入 REPL 内部) |
bridgeApi.ts | 539 | HTTP API client (所有 REST 调用) |
replBridgeTransport.ts | 370 | v1/v2 transport 抽象层 |
sessionRunner.ts | 550 | 子进程 spawn + NDJSON 解析 |
bridgeMessaging.ts | 461 | 消息路由 + UUID 去重 + control request |
types.ts | 262 | 所有核心类型定义 |
jwtUtils.ts | 256 | JWT 解码 + token refresh scheduler |
bridgeEnabled.ts | 202 | Feature gate 检查 |
trustedDevice.ts | 210 | 受信设备 token 管理 |
createSession.ts | 384 | Session 创建 API 调用 |
pollConfig.ts | 110 | GrowthBook 轮询配置 |
workSecret.ts | 127 | Work secret 解码 + URL 构建 |
sessionIdCompat.ts | 57 | cse_* ↔ session_* 转换 |
capacityWake.ts | 56 | 双信号合并器 |
flushGate.ts | 71 | 初始刷新消息门 |
bridgeConfig.ts | — | Bridge 配置解析 |
bridgePointer.ts | — | 崩溃恢复指针 |
bridgeUI.ts | — | TUI 状态显示 |
RemoteSessionManager.ts | 343 | 客户端 session 管理 |
SessionsWebSocket.ts | 404 | WebSocket 连接 + 重连 |
remotePermissionBridge.ts | 78 | 权限请求合成消息 |
sdkMessageAdapter.ts | 302 | SDK → REPL 消息转换 |
二十、总结
Bridge 系统是一个精心设计的分布式 session 编排器,核心挑战在于:
- 不可靠网络:指数退避 + 系统休眠检测 + ±25% jitter
- 长会话:proactive token refresh(5min buffer)+ 30min follow-up
- 多协议:v1 WebSocket vs v2 SSE+CCRClient,通过 transport 抽象统一
- 并发管理:最多 32 个 session,capacity wake 机制确保即时响应
- 安全分层:OAuth → Session JWT → Trusted Device Token 三级认证
- 崩溃恢复:bridge-pointer 文件 + --continue 恢复
- 优雅关闭:SIGTERM → SIGKILL (30s grace) → worktree 清理 → archive + deregister
整个子系统由 31 个文件、12,613 行代码组成,其中 bridgeMain.ts(2,999 行)和 replBridge.ts(2,406 行)是两个最大的模块,分别对应 standalone 模式和 REPL 内嵌模式。