Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Bridge(远程控制)系统深度分析

Bridge(远程控制)系统深度分析

基于 /src/bridge/(31 个文件,12,613 行)和 /src/remote/(4 个文件)的完整代码审计。


一、系统总览

Bridge 系统是 Claude Code 的 远程控制(Remote Control) 子系统。它让用户能从 claude.ai/web 或 Claude 移动端连接到本地运行的 CLI 会话——本质上,你的笔记本变成了一个"云工作器",通过轮询一个远程 API 获取工作项(session),然后在本地 spawn 子 CLI 进程来执行实际的代码编辑任务。

核心价值主张:用户离开终端后仍可通过浏览器继续 coding session,实现跨设备连续工作。

关键术语

术语含义
Environment一台注册到服务器的本地机器(bridge 进程实例),可管理多个 session
Work Item服务器下发的工作单元,携带 session ID + work secret
Work Secretbase64url 编码的 JSON,包含 session ingress JWT + API base URL
Session一个子 CLI 进程实例,处理具体的 coding 任务
Session Ingress服务器端接收子进程事件的 WebSocket/HTTP 端点
CCRClaude Code Runtime — 远程 session 执行的后端基础设施
v1/v2两种协议版本:v1 = Session-Ingress WebSocket;v2 = SSE + CCRClient

二、架构总览:Hub-and-Spoke 模式

                    ┌──────────────────────────┐
                    │   claude.ai / Web Client  │
                    └────────────┬─────────────┘
                                 │ WebSocket + HTTP POST
                    ┌────────────▼─────────────┐
                    │  CCR Server (cloud infra) │
                    │  - Environment Registry   │
                    │  - Work Queue (Redis PEL) │
                    │  - Session Ingress        │
                    └────────────┬─────────────┘
                                 │
              ┌──────────────────▼──────────────────┐
              │     bridgeMain.ts (Orchestrator)      │
              │  ┌─────────────────────────────────┐  │
              │  │  Poll Loop (while !aborted)      │  │
              │  │  ├─ pollForWork()                │  │
              │  │  ├─ decodeWorkSecret()           │  │
              │  │  ├─ registerWorker() (v2 only)   │  │
              │  │  ├─ spawn(sessionRunner)         │  │
              │  │  ├─ heartbeatActiveWorkItems()   │  │
              │  │  └─ capacityWake / backoff       │  │
              │  └─────────────────────────────────┘  │
              │         │           │           │      │
              │    ┌────▼───┐  ┌────▼───┐  ┌────▼───┐ │
              │    │ Session │  │ Session│  │ Session│ │
              │    │ Child 1 │  │ Child 2│  │ Child N│ │
              │    │ (stdin/ │  │        │  │        │ │
              │    │ stdout) │  │        │  │        │ │
              │    └─────────┘  └────────┘  └────────┘ │
              └────────────────────────────────────────┘

另有一条独立路径:
              ┌──────────────────────────────────┐
              │  replBridge.ts (In-process mode)  │
              │  嵌入 REPL 内部,共享进程          │
              │  通过 ReplBridgeTransport 通信     │
              └──────────────────────────────────┘

bridgeMain.ts 是整个子系统的 orchestrator(2,999 行),负责:

  1. 注册 environment → 获取 environment_id + environment_secret
  2. 轮询工作队列(poll loop)
  3. 解码 work secret → 获取 session ingress JWT
  4. 为每个 session spawn 子 CLI 进程
  5. 管理多会话生命周期(最多 32 个并发)
  6. 错误恢复 + 指数退避
  7. 优雅关闭

三、Environment Worker 注册与轮询

3.1 注册流程

bridgeMain.ts:2447-2467

const reg = await api.registerBridgeEnvironment(config)
// → POST /v1/environments/bridge
// 发送: machine_name, directory, branch, git_repo_url, max_sessions, metadata
// 接收: environment_id, environment_secret

注册时发送的 BridgeConfig(types.ts:81-115)包含:

  • dir: 当前工作目录
  • machineName: hostname()
  • branch, gitRepoUrl: Git 上下文
  • maxSessions: 最大并发数(默认 32)
  • spawnMode: 'single-session' | 'worktree' | 'same-dir'
  • bridgeId: 客户端 UUID
  • workerType: 'claude_code' 或 'claude_code_assistant'
  • reuseEnvironmentId: 恢复旧 session 时复用已有的 environment ID

bridgeApi.ts:76-89 的 getHeaders() 会在每个请求中附带:

  • Authorization: Bearer <OAuth token> — 用于认证
  • anthropic-beta: environments-2025-11-01 — API 版本头
  • X-Trusted-Device-Token — 受信设备令牌(如果 gate 开启)

3.2 轮询循环

bridgeMain.ts:600-1401 是核心的 while (!loopSignal.aborted) 循环:

┌─────────────────────────────────────────────────────────┐
│                    Poll Loop Tick                        │
│                                                         │
│  1. pollForWork(envId, envSecret, signal)               │
│     → GET /v1/environments/{id}/work/poll               │
│                                                         │
│  2. if !work:                                           │
│     ├─ at capacity? → heartbeat loop / slow poll        │
│     └─ not at capacity? → sleep(partial/full interval)  │
│                                                         │
│  3. if work:                                            │
│     ├─ skip if completedWorkIds.has(work.id)            │
│     ├─ decodeWorkSecret(work.secret)                    │
│     ├─ ackWork() → POST .../work/{id}/ack              │
│     ├─ healthcheck? → log + continue                    │
│     └─ session? →                                       │
│        ├─ existing session? → updateAccessToken()       │
│        ├─ at capacity? → break (don't spawn)            │
│        ├─ v2? → registerWorker() → get epoch            │
│        ├─ worktree? → createAgentWorktree()              │
│        ├─ spawn(sessionRunner) → SessionHandle           │
│        └─ schedule timeout + token refresh               │
│                                                         │
│  4. on error:                                           │
│     ├─ BridgeFatalError → break (401/403/404/410)       │
│     ├─ connection error → connBackoff (2s→4s→...→2min)  │
│     ├─ general error → generalBackoff (500ms→...→30s)   │
│     └─ give up after 10 minutes of errors               │
│                                                         │
│  5. sleep detection: gap > 2×backoffCap → reset budget  │
└─────────────────────────────────────────────────────────┘

轮询间隔配置(pollConfig.ts,通过 GrowthBook 动态下发):

参数默认值含义
poll_interval_ms_not_at_capacity—有空余时的轮询间隔
multisession_poll_interval_ms_not_at_capacity—多会话模式未满时
multisession_poll_interval_ms_partial_capacity—部分容量时
multisession_poll_interval_ms_at_capacity—满容量时慢速轮询
non_exclusive_heartbeat_interval_ms—心跳间隔(与轮询并行)
reclaim_older_than_ms5000XAUTOCLAIM 回收超时阈值

四、Session Spawning 模式

4.1 三种 Spawn 模式

types.ts:69:

export type SpawnMode = 'single-session' | 'worktree' | 'same-dir'
模式行为并发上限
single-session一个 session 在 cwd,结束后 bridge 退出1
same-dir持久 server,所有 session 共享 cwd32(默认)
worktree持久 server,每个 session 获得独立 git worktree32(默认)

worktree 模式(bridgeMain.ts:963-1015):

  • 调用 createAgentWorktree() 为每个 on-demand session 创建隔离的 git worktree
  • 初始预创建的 session 例外——它在 config.dir 中运行(匹配旧 UX)
  • session 结束后调用 removeAgentWorktree() 清理

4.2 子进程 Spawn

sessionRunner.ts:248-548 的 createSessionSpawner():

const args = [
  ...deps.scriptArgs,       // npm 模式下需要 script path
  '--print',
  '--sdk-url', opts.sdkUrl, // Session-Ingress WebSocket URL
  '--session-id', opts.sessionId,
  '--input-format', 'stream-json',
  '--output-format', 'stream-json',
  '--replay-user-messages', // 回放用户消息用于标题提取
  '--verbose', '--debug-file', '--permission-mode' // 可选
]

const env = {
  ...deps.env,
  CLAUDE_CODE_OAUTH_TOKEN: undefined,          // 清除 bridge 的 OAuth
  CLAUDE_CODE_ENVIRONMENT_KIND: 'bridge',
  CLAUDE_CODE_SESSION_ACCESS_TOKEN: opts.accessToken, // session ingress JWT
  CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2: '1',       // v1: HybridTransport
  // v2: SSE + CCRClient
  ...(opts.useCcrV2 && {
    CLAUDE_CODE_USE_CCR_V2: '1',
    CLAUDE_CODE_WORKER_EPOCH: String(opts.workerEpoch),
  }),
}

const child = spawn(deps.execPath, args, {
  cwd: dir,
  stdio: ['pipe', 'pipe', 'pipe'], // stdin/stdout/stderr 全部管道
  env,
  windowsHide: true,
})

子进程三根管道的用途:

  • stdin: 接收 update_environment_variables 消息(token 刷新)和 control 命令
  • stdout: 解析 NDJSON 活动流(extractActivities 提取 tool_use + result 事件)
  • stderr: 环形缓冲最近 10 行用于错误诊断

4.3 SessionHandle 接口

types.ts:178-190:

export type SessionHandle = {
  sessionId: string
  done: Promise<SessionDoneStatus>  // 'completed' | 'failed' | 'interrupted'
  kill(): void                      // SIGTERM
  forceKill(): void                 // SIGKILL
  activities: SessionActivity[]     // 最近 10 条活动(环形缓冲)
  currentActivity: SessionActivity | null
  accessToken: string
  lastStderr: string[]
  writeStdin(data: string): void
  updateAccessToken(token: string): void
}

sessionRunner.ts:482-543 实现的 updateAccessToken():

handle.writeStdin(jsonStringify({
  type: 'update_environment_variables',
  variables: { CLAUDE_CODE_SESSION_ACCESS_TOKEN: token },
}) + '\n')

子进程的 StructuredIO 收到此消息后直接设置 process.env,后续的 WebSocket 连接自动使用新 token。


五、多会话管理

5.1 并发控制

bridgeMain.ts:83: const SPAWN_SESSIONS_DEFAULT = 32

多会话由 GrowthBook gate tengu_ccr_bridge_multi_session 控制(bridgeMain.ts:96-98)。

核心数据结构(bridgeMain.ts:163-187):

const activeSessions = new Map<string, SessionHandle>()
const sessionStartTimes = new Map<string, number>()
const sessionWorkIds = new Map<string, string>()
const sessionCompatIds = new Map<string, string>()
const sessionIngressTokens = new Map<string, string>()
const sessionTimers = new Map<string, ReturnType<typeof setTimeout>>()
const completedWorkIds = new Set<string>()
const sessionWorktrees = new Map<string, { worktreePath, worktreeBranch, gitRoot, hookBased }>()
const timedOutSessions = new Set<string>()
const titledSessions = new Set<string>()
const v2Sessions = new Set<string>()

5.2 满容量时的行为

当 activeSessions.size >= config.maxSessions(bridgeMain.ts:639-737):

  1. 心跳模式:如果 non_exclusive_heartbeat_interval_ms > 0,进入心跳子循环,周期性 heartbeat 所有活跃 session,直到:

    • Poll 到期(atCapMs 超时)
    • Session 完成(capacityWake 信号)
    • Auth 失败(JWT 过期 → reconnectSession)
    • 关闭信号
  2. 慢轮询模式:如果心跳禁用,用 atCapMs 间隔慢速轮询作为存活信号

  3. Capacity Wake(见下文):session 完成时立即唤醒轮询

5.3 Session 完成处理

bridgeMain.ts:442-591 的 onSessionDone() 回调:

onSessionDone →
  1. 清理所有 Map 条目
  2. 清除 timeout timer + token refresh timer
  3. capacityWake.wake() → 唤醒 at-capacity 睡眠
  4. 如果 timeout 导致 → 将 'interrupted' 映射为 'failed'
  5. stopWorkWithRetry() → 告知服务器 work 结束
  6. 清理 worktree(如果有)
  7. 单会话模式 → controller.abort() 退出 poll loop
  8. 多会话模式 → archiveSession() 后继续接受新 work

六、JWT Token 生命周期

6.1 Token 来源

Session ingress JWT 通过 work secret 传递:

// workSecret.ts:6-32
const secret = decodeWorkSecret(work.secret)
// secret.session_ingress_token — JWT,用于 API 认证
// secret.api_base_url — API 基础 URL
// secret.use_code_sessions — 是否使用 v2 协议

6.2 Proactive Token Refresh

jwtUtils.ts:72-255 的 createTokenRefreshScheduler():

┌──────────────────────────────────────────────────┐
│           Token Refresh Scheduler                 │
│                                                   │
│  schedule(sessionId, jwt) →                       │
│    1. decodeJwtExpiry(jwt) → exp claim            │
│    2. delayMs = exp - now - 5min buffer           │
│    3. setTimeout(doRefresh, delayMs)              │
│                                                   │
│  doRefresh(sessionId, gen) →                      │
│    1. oauthToken = await getAccessToken()         │
│    2. if v1: handle.updateAccessToken(oauthToken) │
│       if v2: api.reconnectSession() → re-dispatch │
│    3. schedule follow-up in 30min                 │
│                                                   │
│  失败策略:                                        │
│    - 连续失败 ≤ 3 次: 60s 后重试                  │
│    - 连续失败 > 3 次: 放弃刷新                     │
│                                                   │
│  Generation counter:                              │
│    - schedule() 递增 → 失效 in-flight doRefresh   │
│    - cancel() 递增 → 同上                         │
└──────────────────────────────────────────────────┘

v1 vs v2 的关键区别:

  • v1: onRefresh 调用 handle.updateAccessToken(oauthToken) → 通过 stdin 发送给子进程
  • v2: onRefresh 调用 api.reconnectSession() → 服务器重新派发 work → 下次 poll 携带新 JWT

这是因为 v2 的 CCR worker 端点验证 JWT 的 session_id claim(register_worker.go:32),OAuth token 没有此 claim。

6.3 JWT 解码

jwtUtils.ts:21-48:

export function decodeJwtPayload(token: string): unknown | null {
  const jwt = token.startsWith('sk-ant-si-')
    ? token.slice('sk-ant-si-'.length)  // 去掉 session-ingress 前缀
    : token
  const parts = jwt.split('.')
  if (parts.length !== 3 || !parts[1]) return null
  return jsonParse(Buffer.from(parts[1], 'base64url').toString('utf8'))
}

七、受信设备安全层(Trusted Device)

trustedDevice.ts:1-210:

┌──────────────────────────────────────────────────────────┐
│               Trusted Device Token Flow                    │
│                                                           │
│  Enrollment (POST /auth/trusted_devices):                 │
│    - 只在 fresh /login 后 10 分钟内有效                    │
│    - 必须在 /login hook 中调用 enrollTrustedDevice()       │
│    - 服务端返回 device_token → 存入 keychain               │
│    - Token 90 天滚动过期                                   │
│                                                           │
│  Usage:                                                   │
│    - getHeaders() 检查 GrowthBook gate                     │
│      tengu_sessions_elevated_auth_enforcement              │
│    - 如果 gate 开启 → 读取 keychain → X-Trusted-Device-Token │
│    - 如果 gate 关闭 → header 不发送                        │
│                                                           │
│  Security tier:                                           │
│    - Bridge session 在服务端标记为 SecurityTier=ELEVATED   │
│    - 服务端 ConnectBridgeWorker 检查 trusted device        │
│    - 双 flag 设计: CLI 侧先开启(header 开始流动),       │
│      然后服务端开启(开始强制检查)                          │
│                                                           │
│  Priority:                                                │
│    环境变量 CLAUDE_TRUSTED_DEVICE_TOKEN > keychain         │
│    企业 wrapper 可以通过 env var 提供自己的 token           │
└──────────────────────────────────────────────────────────┘

关键函数:

  • getTrustedDeviceToken() (trustedDevice.ts:54-59): gate + memoized 读取
  • enrollTrustedDevice() (trustedDevice.ts:98-210): 注册流程,best-effort
  • clearTrustedDeviceToken() (trustedDevice.ts:72-87): 登出时清除
  • clearTrustedDeviceTokenCache() (trustedDevice.ts:61-63): 清除 memo 缓存

八、消息路由与 UUID 去重

8.1 BoundedUUIDSet(环形缓冲去重)

bridgeMessaging.ts:429-461:

export class BoundedUUIDSet {
  private readonly ring: (string | undefined)[]
  private readonly set = new Set<string>()
  private writeIdx = 0

  add(uuid: string): void {
    if (this.set.has(uuid)) return
    const evicted = this.ring[this.writeIdx]
    if (evicted !== undefined) this.set.delete(evicted)
    this.ring[this.writeIdx] = uuid
    this.set.add(uuid)
    this.writeIdx = (this.writeIdx + 1) % this.capacity
  }
}

使用两个实例:

  • recentPostedUUIDs: 我方发出的消息 UUID → 过滤 echo
  • recentInboundUUIDs: 已转发的入站消息 UUID → 防止重复派发

8.2 Ingress 消息路由

bridgeMessaging.ts:132-208 的 handleIngressMessage():

收到 WS data → jsonParse → normalizeControlMessageKeys
  ├─ isSDKControlResponse? → onPermissionResponse (权限响应)
  ├─ isSDKControlRequest?  → onControlRequest (initialize/set_model/interrupt)
  ├─ isSDKMessage?          →
  │   ├─ uuid in recentPostedUUIDs? → 跳过 (echo)
  │   ├─ uuid in recentInboundUUIDs? → 跳过 (重复派发)
  │   └─ type === 'user'? → onInboundMessage (转发给子进程)
  └─ 其他 → 跳过

8.3 Server Control Request 处理

bridgeMessaging.ts:243-391 的 handleServerControlRequest():

处理来自服务端的 control_request:

  • initialize: 回复最小 capabilities(commands[], models[], pid)
  • set_model: 触发 onSetModel 回调
  • set_max_thinking_tokens: 触发 onSetMaxThinkingTokens 回调
  • set_permission_mode: 策略裁决(ok/error)
  • interrupt: 触发 onInterrupt 回调
  • 超时约束:必须在 10-14 秒内响应,否则服务端杀死 WS

Outbound-only 模式:所有 mutating 请求回复 OUTBOUND_ONLY_ERROR(bridgeMessaging.ts:231-232),只有 initialize 仍然回复 success(否则服务端会断开连接)。


九、Work Secret 解码

workSecret.ts:6-32:

export function decodeWorkSecret(secret: string): WorkSecret {
  const json = Buffer.from(secret, 'base64url').toString('utf-8')
  const parsed = jsonParse(json)
  // 校验 version === 1
  // 校验 session_ingress_token 非空
  // 校验 api_base_url 存在
  return parsed as WorkSecret
}

WorkSecret 结构(types.ts:33-51):

export type WorkSecret = {
  version: number
  session_ingress_token: string    // JWT
  api_base_url: string
  sources: Array<{ type, git_info? }>
  auth: Array<{ type, token }>
  claude_code_args?: Record<string, string>
  mcp_config?: unknown
  environment_variables?: Record<string, string>
  use_code_sessions?: boolean      // v2 协议选择器
}

use_code_sessions 字段:服务端通过 prepare_work_secret() 设置,决定使用 v1(Session-Ingress WebSocket)还是 v2(SSE + CCRClient)。环境变量 CLAUDE_BRIDGE_USE_CCR_V2 可以覆盖(仅 ant 开发用)。

SDK URL 构建

workSecret.ts:41-48(v1):

// localhost → ws://host/v2/session_ingress/ws/{id}
// production → wss://host/v1/session_ingress/ws/{id} (Envoy rewrite v1→v2)

workSecret.ts:81-87(v2):

// → https://host/v1/code/sessions/{id}

Session ID 兼容性

sessionIdCompat.ts:38-56:

CCR v2 的 infra 层使用 cse_* 前缀,但 compat API 需要 session_* 前缀。toCompatSessionId() 和 toInfraSessionId() 做互相转换,底层 UUID 相同。

sameSessionId()(workSecret.ts:62-73)比较两个不同前缀的 session ID:

const aBody = a.slice(a.lastIndexOf('_') + 1)  // 取最后一个 _ 之后的部分
return aBody.length >= 4 && aBody === bBody

十、协议版本:v1 vs v2

10.1 v1: HybridTransport

  • 读取: WebSocket 连接到 Session-Ingress
  • 写入: HTTP POST 到 Session-Ingress
  • 认证: OAuth token(通过 CLAUDE_CODE_SESSION_ACCESS_TOKEN env var)
  • 重放: 服务端通过 message cursor 处理,不需要 SSE sequence number

10.2 v2: SSETransport + CCRClient

replBridgeTransport.ts:119-369 的 createV2ReplTransport():

┌────────────────────────────────────────────────────────┐
│                 v2 Transport Stack                      │
│                                                         │
│  SSETransport (读取):                                   │
│    URL: {sessionUrl}/worker/events/stream               │
│    - 支持 from_sequence_num / Last-Event-ID             │
│    - 重连预算耗尽 → onClose(undefined) → 4092           │
│                                                         │
│  CCRClient (写入):                                      │
│    - SerialBatchEventUploader (maxBatchSize=100)        │
│    - POST /worker/events                                │
│    - Heartbeat: PUT /worker/state (每 20s)              │
│    - Delivery tracking: POST /worker/events/{id}/delivery│
│                                                         │
│  Worker Registration:                                   │
│    - POST {sessionUrl}/worker/register                  │
│    - 返回 worker_epoch (单调递增)                        │
│    - epoch mismatch (409) → 关闭 + poll loop recovery   │
│                                                         │
│  Auth: JWT(session_id claim),非 OAuth token           │
│                                                         │
│  Epoch Mismatch Handling:                               │
│    - CCRClient 收到 409 → onEpochMismatch 回调          │
│    - 关闭 CCR + SSE → onClose(4090)                    │
│    - replBridge 的 onClose → 回到 poll loop             │
│    - 下次 poll 派发新 epoch → 重建 transport            │
└────────────────────────────────────────────────────────┘

SSE Sequence Number 传递(replBridgeTransport.ts:128-131):

initialSequenceNum?: number
// 传递给 SSETransport,connect() 时发送 from_sequence_num
// 避免每次 transport swap 重放整个 session 历史

Delivery ACK 优化(replBridgeTransport.ts:249-252):

sse.setOnEvent(event => {
  ccr.reportDelivery(event.event_id, 'received')
  ccr.reportDelivery(event.event_id, 'processed')  // 立即标记 processed
})

原来的实现只标记 received,导致 daemon 重启后 21-25 个 phantom prompt 被重放。立即标记 processed 的 trade-off:崩溃窗口内丢失一个 prompt vs 每次重启重放 N 个。


十一、远程 Session 客户端

11.1 RemoteSessionManager

RemoteSessionManager.ts:95-343:

┌──────────────────────────────────────────────────┐
│          RemoteSessionManager                     │
│                                                   │
│  职责:                                            │
│  1. WebSocket 订阅(接收消息流)                    │
│  2. HTTP POST 发送用户消息                         │
│  3. Permission request/response 流程               │
│                                                   │
│  connect() →                                      │
│    new SessionsWebSocket(sessionId, orgUuid, ...)  │
│    ws.connect()                                    │
│                                                   │
│  handleMessage() →                                │
│    ├─ control_request → handleControlRequest       │
│    │   └─ can_use_tool → callbacks.onPermissionRequest │
│    ├─ control_cancel_request → onPermissionCancelled   │
│    ├─ control_response → 跳过(ACK)               │
│    └─ SDKMessage → callbacks.onMessage             │
│                                                   │
│  respondToPermissionRequest() →                   │
│    构建 SDKControlResponse                         │
│    → ws.sendControlResponse()                      │
│                                                   │
│  cancelSession() →                                │
│    ws.sendControlRequest({ subtype: 'interrupt' }) │
└──────────────────────────────────────────────────┘

11.2 SessionsWebSocket — 连接弹性

SessionsWebSocket.ts:82-404:

连接策略:
  URL: wss://api.anthropic.com/v1/sessions/ws/{id}/subscribe?organization_uuid=...
  Auth: Bearer <OAuth token> (via headers, 非 auth message)

重连机制:
  - 常规断开: 最多重连 5 次,每次间隔 2 秒
  - 4001 (session not found): 最多重试 3 次(compaction 期间可能瞬时)
  - 4003 (unauthorized): 永不重连
  - 重连时 reset 计数器: reconnectAttempts = 0, sessionNotFoundRetries = 0

心跳: 每 30 秒 ping (ws.ping())

Runtime 兼容:
  - Bun: globalThis.WebSocket + headers/proxy/tls options
  - Node: ws package + agent/tls options

11.3 Permission Bridge — 合成消息

remotePermissionBridge.ts:12-78:

远程 CCR 容器发来的 permission request 需要转换成本地权限 UI 能理解的格式:

createSyntheticAssistantMessage(request, requestId) → AssistantMessage
  // 构造一个假的 AssistantMessage,包含 tool_use block
  // 因为 ToolUseConfirm 类型需要 AssistantMessage
  // 但实际的 tool 在远程容器执行

createToolStub(toolName) → Tool
  // 当远程有本地不存在的工具(如 MCP tools)
  // 创建一个最小 stub,路由到 FallbackPermissionRequest

11.4 SDK Message Adapter

sdkMessageAdapter.ts:168-278:

将 CCR 发来的 SDKMessage 转换为 REPL 内部的 Message 类型:

SDK TypeREPL Type说明
assistantAssistantMessage直接映射
userignored / UserMessageCCR 模式忽略;direct-connect 模式转换 tool_result
stream_eventStreamEvent流式事件
result (非 success)SystemMessage错误才显示
system (init)SystemMessage"Remote session initialized"
system (status)SystemMessage"Compacting conversation..."
system (compact_boundary)SystemMessage (compact_boundary)带 compact metadata
tool_progressSystemMessage"Tool X running for Ys"
auth_statusignoredSDK-only
tool_use_summaryignoredSDK-only
rate_limit_eventignoredSDK-only

十二、错误处理与退避策略

12.1 退避配置

bridgeMain.ts:59-79:

const DEFAULT_BACKOFF: BackoffConfig = {
  connInitialMs: 2_000,      // 连接错误初始延迟
  connCapMs: 120_000,        // 连接错误最大延迟 (2min)
  connGiveUpMs: 600_000,     // 连接错误放弃阈值 (10min)
  generalInitialMs: 500,     // 通用错误初始延迟
  generalCapMs: 30_000,      // 通用错误最大延迟 (30s)
  generalGiveUpMs: 600_000,  // 通用错误放弃阈值 (10min)
  shutdownGraceMs: 30_000,   // SIGTERM→SIGKILL 宽限期
  stopWorkBaseDelayMs: 1000, // stopWork 重试基数
}

12.2 错误分类

bridgeMain.ts:1582-1612:

// 连接错误: ECONNREFUSED, ECONNRESET, ETIMEDOUT, ENETUNREACH, EHOSTUNREACH
function isConnectionError(err): boolean { ... }

// 服务器错误: ERR_BAD_RESPONSE (HTTP 5xx)
function isServerError(err): boolean { ... }

// Fatal 错误 (BridgeFatalError):
//   401 → 认证失败
//   403 → 权限拒绝 / environment 过期
//   404 → environment 不存在
//   410 → environment 已过期

12.3 Jitter

bridgeMain.ts:1615-1616:

function addJitter(ms: number): number {
  return Math.max(0, ms + ms * 0.25 * (2 * Math.random() - 1))  // ±25%
}

12.4 系统休眠检测

bridgeMain.ts:107-109:

function pollSleepDetectionThresholdMs(backoff: BackoffConfig): number {
  return backoff.connCapMs * 2  // 2 × 2min = 4min
}

如果两次 poll 错误之间的时间间隔 > 4 分钟,说明系统可能休眠了,重置错误预算(backoff 归零,重新开始计时)。

12.5 stopWorkWithRetry

bridgeMain.ts:1627-1676:

指数退避重试 3 次(1s/2s/4s),确保服务器知道 work item 已结束,防止服务端僵尸。

Fatal 错误(401/403)不重试——认证问题不会自愈。


十三、Capacity Wake 系统

capacityWake.ts:1-56:

这是一个双信号合并器,让 at-capacity 睡眠可以被两种事件提前唤醒:

export function createCapacityWake(outerSignal: AbortSignal): CapacityWake {
  let wakeController = new AbortController()

  function wake(): void {
    wakeController.abort()           // 唤醒当前睡眠
    wakeController = new AbortController()  // 为下次睡眠准备新 controller
  }

  function signal(): CapacitySignal {
    const merged = new AbortController()
    // 同时监听 outerSignal(shutdown)和 wakeController.signal(session 完成)
    outerSignal.addEventListener('abort', abort, { once: true })
    wakeController.signal.addEventListener('abort', abort, { once: true })
    return { signal: merged.signal, cleanup: ... }
  }
}

使用位置:

  • bridgeMain.ts(standalone bridge):poll loop 的 at-capacity 睡眠
  • replBridge.ts(REPL bridge):同样用于 poll loop

关键行为:onSessionDone 中调用 capacityWake.wake()(bridgeMain.ts:467),确保 session 结束后 bridge 立即 poll 新工作,而不是等到下次 poll interval。


十四、FlushGate — 初始刷新门

flushGate.ts:1-71:

Bridge session 启动时,历史消息通过单个 HTTP POST 刷新到服务器。在此期间,新消息必须排队,防止历史消息和新消息交错到达服务器。

start() → enqueue() 返回 true, 消息排队
end()   → 返回排队的消息, enqueue() 返回 false
drop()  → 丢弃所有排队消息 (永久关闭)
deactivate() → 清除 active 标志但不丢弃 (transport 替换)

十五、关键 API 端点

bridgeApi.ts 定义的所有 API 调用:

方法端点用途
registerBridgeEnvironmentPOST /v1/environments/bridge注册 environment
pollForWorkGET /v1/environments/{id}/work/poll轮询工作项
acknowledgeWorkPOST /v1/environments/{id}/work/{id}/ack确认工作项
stopWorkPOST /v1/environments/{id}/work/{id}/stop停止工作项
deregisterEnvironmentDELETE /v1/environments/bridge/{id}注销 environment
archiveSessionPOST /v1/sessions/{id}/archive归档 session
reconnectSessionPOST /v1/environments/{id}/bridge/reconnect重新连接 session
heartbeatWorkPOST /v1/environments/{id}/work/{id}/heartbeat心跳保活
sendPermissionResponseEventPOST /v1/sessions/{id}/events发送权限响应

所有端点使用 environments-2025-11-01 beta header。

认证

  • Poll/Stop/Deregister: OAuth token(通过 withOAuthRetry 包装,401 时自动刷新)
  • Ack/Heartbeat: Session ingress JWT(从 work secret 解码)
  • Trusted Device: X-Trusted-Device-Token header(可选,gate 控制)

十六、Headless Bridge(Daemon Worker)

bridgeMain.ts:2770-2965 的 runBridgeHeadless():

非交互式 bridge 入口,供 daemon worker 调用。与 bridgeMain() 的区别:

  • 没有 readline 对话框
  • 没有 stdin 键盘处理
  • 没有 TUI
  • 没有 process.exit()(由调用者控制)
  • 配置来自 caller(daemon.json),auth 通过 IPC(supervisor 的 AuthManager)
  • 抛出 BridgeHeadlessPermanentError 让 supervisor 区分永久/瞬态错误

十七、Feature Gate 矩阵

bridgeEnabled.ts 和相关文件中的 gate:

Gate用途
feature('BRIDGE_MODE')编译时:bridge 代码是否包含在 build 中
tengu_ccr_bridge运行时:bridge 功能是否对用户可用
tengu_ccr_bridge_multi_session多会话 spawn 模式
tengu_bridge_repl_v2env-less (v2) REPL bridge 路径
tengu_bridge_repl_v2_cse_shim_enabledcse_* → session_* 转换
tengu_sessions_elevated_auth_enforcement受信设备 token
tengu_bridge_poll_interval_config轮询间隔动态配置
tengu_bridge_min_version最低 CLI 版本要求
tengu_cobalt_harborCCR auto-connect
tengu_ccr_mirrorCCR mirror mode
feature('KAIROS')--session-id / --continue 恢复功能
feature('CCR_AUTO_CONNECT')自动连接 CCR
feature('CCR_MIRROR')Mirror mode

十八、Mermaid 序列图

18.1 完整 Session 生命周期

18.2 WebSocket 订阅(客户端侧)

18.3 Token Refresh 时序


十九、关键文件索引

文件行数职责
bridgeMain.ts2,999Orchestrator: poll loop, session lifecycle, shutdown
replBridge.ts2,406In-process REPL bridge (嵌入 REPL 内部)
bridgeApi.ts539HTTP API client (所有 REST 调用)
replBridgeTransport.ts370v1/v2 transport 抽象层
sessionRunner.ts550子进程 spawn + NDJSON 解析
bridgeMessaging.ts461消息路由 + UUID 去重 + control request
types.ts262所有核心类型定义
jwtUtils.ts256JWT 解码 + token refresh scheduler
bridgeEnabled.ts202Feature gate 检查
trustedDevice.ts210受信设备 token 管理
createSession.ts384Session 创建 API 调用
pollConfig.ts110GrowthBook 轮询配置
workSecret.ts127Work secret 解码 + URL 构建
sessionIdCompat.ts57cse_* ↔ session_* 转换
capacityWake.ts56双信号合并器
flushGate.ts71初始刷新消息门
bridgeConfig.ts—Bridge 配置解析
bridgePointer.ts—崩溃恢复指针
bridgeUI.ts—TUI 状态显示
RemoteSessionManager.ts343客户端 session 管理
SessionsWebSocket.ts404WebSocket 连接 + 重连
remotePermissionBridge.ts78权限请求合成消息
sdkMessageAdapter.ts302SDK → REPL 消息转换

二十、总结

Bridge 系统是一个精心设计的分布式 session 编排器,核心挑战在于:

  1. 不可靠网络:指数退避 + 系统休眠检测 + ±25% jitter
  2. 长会话:proactive token refresh(5min buffer)+ 30min follow-up
  3. 多协议:v1 WebSocket vs v2 SSE+CCRClient,通过 transport 抽象统一
  4. 并发管理:最多 32 个 session,capacity wake 机制确保即时响应
  5. 安全分层:OAuth → Session JWT → Trusted Device Token 三级认证
  6. 崩溃恢复:bridge-pointer 文件 + --continue 恢复
  7. 优雅关闭:SIGTERM → SIGKILL (30s grace) → worktree 清理 → archive + deregister

整个子系统由 31 个文件、12,613 行代码组成,其中 bridgeMain.ts(2,999 行)和 replBridge.ts(2,406 行)是两个最大的模块,分别对应 standalone 模式和 REPL 内嵌模式。