Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Claude Code QueryEngine 深度分析

Claude Code QueryEngine 深度分析

架构概览

QueryEngine 是 Claude Code 的核心 LLM 交互引擎,负责管理整个对话生命周期——从用户输入到 API 调用、流式响应处理、工具执行循环,再到最终结果返回。

整体架构分为三层:

┌─────────────────────────────────────────────┐
│            QueryEngine (会话管理)              │
│  src/QueryEngine.ts                         │
│  - submitMessage() → AsyncGenerator         │
│  - 消息状态、成本追踪、会话持久化               │
├─────────────────────────────────────────────┤
│            query() (核心循环)                 │
│  src/query.ts                               │
│  - while(true) agentic loop                 │
│  - 上下文压缩、token 管理                     │
│  - 工具调用编排                              │
├─────────────────────────────────────────────┤
│         queryModelWithStreaming              │
│  src/services/api/claude.ts                  │
│  - API 请求构建                               │
│  - SSE 流式解析                               │
│  - 重试/降级逻辑                              │
├─────────────────────────────────────────────┤
│         withRetry + withStreamingVCR         │
│  src/services/api/withRetry.ts               │
│  src/services/vcr.ts                         │
│  - 指数退避重试                               │
│  - 模型 fallback (529 过载)                   │
│  - VCR 录制/回放                              │
└─────────────────────────────────────────────┘

QueryEngine 类 (src/QueryEngine.ts)

设计理念

QueryEngine 是一个有状态的会话对象:一个 QueryEngine 实例对应一个会话,每个 submitMessage() 调用在一个会话内开启新的轮次。消息历史、文件缓存、token 使用量等状态跨轮次持久化。

// src/QueryEngine.ts:184-207
export class QueryEngine {
  private config: QueryEngineConfig
  private mutableMessages: Message[]       // 消息历史(跨轮次持久化)
  private abortController: AbortController  // 中断控制
  private permissionDenials: SDKPermissionDenial[]  // 权限拒绝记录
  private totalUsage: NonNullableUsage      // 累积 token 用量
  private readFileState: FileStateCache     // 文件读取缓存
  private discoveredSkillNames = new Set<string>()  // 技能发现追踪
  private loadedNestedMemoryPaths = new Set<string>()  // 嵌套 memory 路径
}

QueryEngineConfig (src/QueryEngine.ts:130-173)

export type QueryEngineConfig = {
  cwd: string                        // 工作目录
  tools: Tools                       // 可用工具列表
  commands: Command[]                 // 斜杠命令
  mcpClients: MCPServerConnection[]  // MCP 服务器连接
  agents: AgentDefinition[]          // 子 agent 定义
  canUseTool: CanUseToolFn           // 工具权限判断函数
  getAppState: () => AppState        // 获取应用状态
  setAppState: (f: (prev: AppState) => AppState) => void
  initialMessages?: Message[]        // 初始消息(会话恢复)
  readFileCache: FileStateCache      // 文件状态缓存
  customSystemPrompt?: string        // 自定义系统提示词
  appendSystemPrompt?: string        // 追加系统提示词
  userSpecifiedModel?: string        // 用户指定模型
  fallbackModel?: string             // 降级模型
  thinkingConfig?: ThinkingConfig    // Thinking 配置
  maxTurns?: number                  // 最大轮次数
  maxBudgetUsd?: number              // 最大 USD 预算
  taskBudget?: { total: number }     // API 侧 token 预算
  jsonSchema?: Record<string, unknown>  // 结构化输出 schema
  verbose?: boolean                  // 详细输出
  replayUserMessages?: boolean       // SDK 模式下重放用户消息
  includePartialMessages?: bool     // 是否包含流式中间事件
  abortController?: AbortController  // 外部中断控制
  snipReplay?: (...) => ...          // snip 压缩回放处理器
}

便捷入口 ask() (src/QueryEngine.ts:1186-1295)

ask() 是 QueryEngine 的单次调用便捷封装,内部创建 QueryEngine 实例并调用 submitMessage():

// src/QueryEngine.ts:1249-1285
const engine = new QueryEngine({ /* config */ })
yield* engine.submitMessage(prompt, { uuid: promptUuid, isMeta })

核心循环:query() 与 queryLoop()

循环结构 (src/query.ts:219-1729)

query() 是核心 agentic 循环的入口。内部调用 queryLoop(),采用 while(true) + State 模式实现跨迭代状态传递:

State 类型 (src/query.ts:204-217)

循环跨迭代的可变状态:

type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined  // 上一次迭代的继续原因
}

继续原因 (Transition reasons)

transition 字段记录循环继续的原因,供后续迭代和测试断言使用:

  • next_turn — 正常工具调用后的下一轮
  • reactive_compact_retry — 413 后响应式压缩重试
  • collapse_drain_retry — 上下文折叠排水重试
  • max_output_tokens_recovery — 输出 token 截断恢复
  • max_output_tokens_escalate — 输出 token 升级到 64k
  • stop_hook_blocking — stop hook 产生 blocking error 后重试
  • token_budget_continuation — token 预算未耗尽,自动继续

API 调用构造

queryModelWithStreaming (src/services/api/claude.ts:752-780)

对外暴露的流式 API 调用函数:

export async function* queryModelWithStreaming({
  messages, systemPrompt, thinkingConfig, tools, signal, options
}): AsyncGenerator<StreamEvent | AssistantMessage | SystemAPIErrorMessage> {
  return yield* withStreamingVCR(messages, async function* () {
    yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
  })
}

queryModel (src/services/api/claude.ts:1017+)

核心 API 请求构造函数,职责包括:

  1. 工具 Schema 构建 (src/services/api/claude.ts:1235-1246):将工具定义转换为 API schema,支持 defer_loading 动态工具加载
  2. 消息规范化 (src/services/api/claude.ts:1266):normalizeMessagesForAPI 处理 tool_use/tool_result 配对、去除多余字段
  3. 系统提示词构建 (src/services/api/claude.ts:1358-1369):组装 CLI 归属头、前缀、advisor 指令等
  4. Prompt Cache 断点 (src/services/api/claude.ts:1701-1709):addCacheBreakpoints 在消息中插入 cache_control 标记
  5. Thinking 配置 (src/services/api/claude.ts:1596-1630):支持 adaptive thinking 和 budget-based thinking
  6. Beta 头管理 (src/services/api/claude.ts:1071+):动态 beta header,sticky-on latch 机制避免 cache 破坏

请求参数结构 (src/services/api/claude.ts:1699-1728)

最终发送给 API 的参数:

return {
  model: normalizeModelStringForAPI(options.model),
  messages: addCacheBreakpoints(messagesForAPI, ...),
  system,                           // 系统提示词 (带 cache_control)
  tools: allTools,                  // 工具定义数组
  tool_choice: options.toolChoice,
  betas: betasParams,               // beta headers
  metadata: getAPIMetadata(),       // user_id, device_id, session_id
  max_tokens: maxOutputTokens,
  thinking,                         // { type: 'adaptive' } 或 { budget_tokens, type: 'enabled' }
  temperature,                      // 仅 thinking 禁用时发送
  context_management,               // API 侧上下文管理策略
  output_config: { effort, task_budget, format },
  speed,                            // fast mode
  ...extraBodyParams,               // Bedrock/Vertex 额外参数
}

流式处理机制

流式事件处理 (src/services/api/claude.ts:1940-2304)

for await (const part of stream) 循环处理 Anthropic SSE 流:

switch (part.type) {
  case 'message_start':
    // 初始化 partialMessage, 记录 ttftMs, 更新 usage
  case 'content_block_start':
    // 按 block 类型初始化: text/tool_use/thinking/server_tool_use
    // tool_use: contentBlocks[i] = { ...block, input: '' }
  case 'content_block_delta':
    // text_delta: 累加 text
    // input_json_delta: 累加 input JSON 字符串
    // thinking_signature_delta: 累加 signature
  case 'content_block_stop':
    // 构建 AssistantMessage, yield 给调用方
    // 解析 tool_use input JSON
  case 'message_delta':
    // 更新 usage, stop_reason
    // 写回已 yield 的 message 对象(直接属性修改)
    // 计算 USD 成本
  case 'message_stop':
    break
}

每次事件后都 yield { type: 'stream_event', event: part } 以传递中间事件。

流式监控

  • 空闲超时看门狗 (src/services/api/claude.ts:1874-1928):STREAM_IDLE_TIMEOUT_MS(默认 90s),无数据到达时终止流
  • 卡顿检测 (src/services/api/claude.ts:1936-1966):30s 无事件视为卡顿,记录日志
  • 非流式降级 (src/services/api/claude.ts:2469-2594):流式失败时回退到非流式请求 executeNonStreamingRequest

StreamingToolExecutor (src/query.ts:562-568)

启用 tengu_streaming_tool_execution2 feature gate 时,在流式接收过程中并行执行工具:

let streamingToolExecutor = useStreamingToolExecution
  ? new StreamingToolExecutor(toolUseContext.options.tools, canUseTool, toolUseContext)
  : null

// 流中:每收到 tool_use block 立即加入执行队列
for (const toolBlock of msgToolUseBlocks) {
  streamingToolExecutor.addTool(toolBlock, message)
}
// 流中:获取已完成的结果
for (const result of streamingToolExecutor.getCompletedResults()) { ... }
// 流结束后:获取剩余结果
for (const update of streamingToolExecutor.getRemainingResults()) { ... }

工具调用循环(Agentic Loop)

非流式工具执行 (src/services/tools/toolOrchestration.ts:19-80)

当未启用 streaming tool execution 时,使用 runTools() 执行工具:

关键点:

  • 读工具并发:如 Read、Grep、Glob 等只读操作并发执行
  • 写工具串行:Write、Edit 等修改操作必须串行
  • 默认最大并发度为 10 (CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY)

工具结果收集 (src/query.ts:1380-1408)

工具执行结果通过 normalizeMessagesForAPI 转换为 API 格式,推入 toolResults 数组:

const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)

for await (const update of toolUpdates) {
  if (update.message) {
    yield update.message
    toolResults.push(...normalizeMessagesForAPI([update.message], tools))
  }
  if (update.newContext) {
    updatedToolUseContext = { ...update.newContext, queryTracking }
  }
}

循环继续/终止 (src/query.ts:1678-1728)

每次迭代末尾组装下一轮 State:

const next: State = {
  messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
  toolUseContext: toolUseContextWithQueryTracking,
  autoCompactTracking: tracking,
  turnCount: nextTurnCount,
  maxOutputTokensRecoveryCount: 0,
  hasAttemptedReactiveCompact: false,
  pendingToolUseSummary: nextPendingToolUseSummary,
  maxOutputTokensOverride: undefined,
  stopHookActive,
  transition: { reason: 'next_turn' },
}
state = next  // while(true) 继续

终止条件检查在循环前部(无工具调用时):

  1. Stop hooks (src/query/stopHooks.ts:65-472):用户定义的停止钩子,可阻止继续
  2. Token budget (src/query/tokenBudget.ts:45-93):检测 token 使用量是否达到预算阈值
  3. maxTurns (src/query.ts:1705-1712):达到最大轮次限制
  4. Abort (src/query.ts:1015-1051):用户中断

重试与错误处理

withRetry (src/services/api/withRetry.ts:170+)

重试生成器封装所有 API 调用:

export async function* withRetry<T>(
  getClient: () => Promise<Anthropic>,
  operation: (client, attempt, context) => Promise<T>,
  options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T>

重试策略:

错误类型处理
429 / 529 (过载)等待 retry-after 或指数退避;连续 3 次 529 触发模型 fallback
401 (认证失败)刷新 OAuth token,重建 client
403 (token 吊销)同 401
ECONNRESET / EPIPE禁用 keep-alive,重建连接
Fast mode 429/529短等待重试(<2s)或进入 fast mode cooldown
其他通用重试逻辑

模型 Fallback (src/services/api/withRetry.ts:327-349)

当连续 529 过载错误达到阈值时触发:

if (consecutive529Errors >= MAX_529_RETRIES) {
  if (options.fallbackModel) {
    throw new FallbackTriggeredError(options.model, options.fallbackModel)
  }
}

FallbackTriggeredError 在 query.ts:894-951 中被捕获,切换到 fallbackModel 后重试整个请求:

if (innerError instanceof FallbackTriggeredError && fallbackModel) {
  currentModel = fallbackModel
  attemptWithFallback = true
  // 清空 assistant messages, 重新发起请求
  assistantMessages.length = 0
  toolResults.length = 0
  toolUseBlocks.length = 0
}

输出 Token 截断恢复 (src/query.ts:1188-1256)

当 stop_reason === 'max_tokens' 时,进行升级重试:

  1. 首次:若启用 tengu_otk_slot_v1,升级到 ESCALATED_MAX_TOKENS(64k)重试
  2. 后续:注入 isMeta 用户消息,要求模型从截断处继续,最多 3 次
  3. 耗尽:返回错误

Prompt Too Long 恢复 (src/query.ts:1070-1183)

413 错误的三级恢复策略:

Token/成本管理

cost-tracker.ts (src/cost-tracker.ts)

全局成本追踪模块,管理以下状态(存储在 bootstrap/state 中):

type StoredCostState = {
  totalCostUSD: number               // 总 USD 费用
  totalAPIDuration: number           // 总 API 耗时
  totalAPIDurationWithoutRetries: number  // 不含重试的 API 耗时
  totalToolDuration: number          // 总工具执行耗时
  totalLinesAdded: number            // 总添加行数
  totalLinesRemoved: number          // 总删除行数
  lastDuration: number | undefined   // 最近一次耗时
  modelUsage: { [modelName: string]: ModelUsage }  // 按模型分类的用量
}

Usage 累积 (src/services/api/claude.ts:2924-3025)

updateUsage:流式事件中逐块更新单消息的 token 用量。注意 input_tokens 仅在 >0 时更新(message_start 设置初始值,后续 delta 不覆盖)。

accumulateUsage:将单消息用量累积到总用量。逐字段加和 input_tokens、output_tokens、cache_read_input_tokens、cache_creation_input_tokens 等。

// src/services/api/claude.ts:2993-3005
export function accumulateUsage(totalUsage, messageUsage): NonNullableUsage {
  return {
    input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
    cache_creation_input_tokens: totalUsage.cache_creation_input_tokens + messageUsage.cache_creation_input_tokens,
    cache_read_input_tokens: totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
    output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
    // ...
  }
}

成本计算 (src/services/api/claude.ts:2251-2256)

在 message_delta 事件中实时计算成本:

const costUSDForPart = calculateUSDCost(resolvedModel, usage)
costUSD += addToTotalSessionCost(costUSDForPart, usage, options.model)

Token Budget (src/query/tokenBudget.ts)

独立于 USD budget 的 token budget 机制,控制单次 agentic turn 的 token 消耗:

// src/query/tokenBudget.ts:45-93
export function checkTokenBudget(tracker, agentId, budget, globalTurnTokens) {
  // 90% 阈值:未达到则注入 nudge 消息鼓励继续
  // diminishing returns:连续 3 次且增量 <500 token 则提前停止
  // 返回 continue 或 stop 决策
}

会话持久化 (src/cost-tracker.ts:87-175)

成本状态跨会话持久化:

  • getStoredSessionCosts():从项目配置读取上次会话成本
  • restoreCostStateForSession():恢复会话时加载历史成本
  • saveCurrentSessionCosts():切换会话前保存当前成本

上下文管理

getSystemContext (src/context.ts:116-150)

系统级上下文,全会话缓存(memoize):

export const getSystemContext = memoize(async () => {
  return {
    gitStatus,           // git 分支、状态、最近提交
    cacheBreaker,        // 缓存破坏标记(ant-only)
  }
})

getUserContext (src/context.ts:155-189)

用户级上下文,全会话缓存:

export const getUserContext = memoize(async () => {
  return {
    claudeMd,            // CLAUDE.md 文件内容
    currentDate,         // 当前日期
  }
})

上下文组装流程 (src/QueryEngine.ts:286-325)

// 1. 获取系统提示词各部分
const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
  tools, mainLoopModel, additionalWorkingDirectories, mcpClients, customSystemPrompt
})

// 2. 组装最终系统提示词
const systemPrompt = asSystemPrompt([
  ...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
  ...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
  ...(appendSystemPrompt ? [appendSystemPrompt] : []),
])

上下文压缩

在 query() 每次迭代前执行多级压缩(src/query.ts:365-447):

  1. Tool result budget:限制单条工具结果大小,超限内容替换为摘要
  2. Snip:截断历史尾部,释放 token
  3. Microcompact:压缩冗余工具调用(如多轮 Read 同一文件)
  4. Context Collapse:折叠历史段为摘要
  5. Auto Compact:当 token 超限时触发全量对话压缩

消息格式与会话模型

Message 类型

type Message =
  | UserMessage      // { type: 'user', message: { role: 'user', content }, uuid, timestamp }
  | AssistantMessage // { type: 'assistant', message: { role: 'assistant', content, usage, stop_reason }, uuid }
  | SystemMessage    // { type: 'system', subtype: '...', content }
  | StreamEvent      // { type: 'stream_event', event: RawStreamEvent }
  | AttachmentMessage  // { type: 'attachment', attachment: {...} }
  | ProgressMessage  // { type: 'progress', toolUseID, data }
  | TombstoneMessage // { type: 'tombstone', message: Message } — UI 删除标记
  | ToolUseSummaryMessage  // { type: 'tool_use_summary', summary, precedingToolUseIds }

会话模型

一个对话是 Message 数组的线性序列:

UserMessage → [meta messages] → AssistantMessage(s) [含 tool_use blocks] →
UserMessage(tool_results) → AssistantMessage(s) → ...

QueryEngine 维护 mutableMessages 数组作为会话的唯一真相来源。每次迭代:

  • 追加 assistant messages 和 tool results
  • 追加 attachments(memory、file changes、commands)
  • 通过 recordTranscript() 持久化到本地存储

关键代码路径

位置功能
src/QueryEngine.ts:184QueryEngine 类定义
src/QueryEngine.ts:209submitMessage() 入口
src/QueryEngine.ts:675进入 query() 循环
src/QueryEngine.ts:757消息类型分发 switch
src/QueryEngine.ts:972USD budget 超限检查
src/query.ts:219query() 入口
src/query.ts:241queryLoop() agentic 循环
src/query.ts:307while(true) 主循环
src/query.ts:365-447上下文压缩流水线
src/query.ts:654-708调用 queryModelWithStreaming
src/query.ts:829-844收集 tool_use blocks
src/query.ts:1062无工具调用时的停止处理
src/query.ts:1085-1183Prompt Too Long 三级恢复
src/query.ts:1188-1256max_output_tokens 升级恢复
src/query.ts:1267handleStopHooks
src/query.ts:1380-1408工具执行(streaming 或 runTools)
src/query.ts:1580收集 attachments
src/query.ts:1714继续循环:组装下一轮 State
src/services/api/claude.ts:752queryModelWithStreaming 入口
src/services/api/claude.ts:1017queryModel API 请求构造
src/services/api/claude.ts:1538paramsFromContext 请求参数组装
src/services/api/claude.ts:1778withRetry 包装
src/services/api/claude.ts:1822anthropic.beta.messages.create 调用
src/services/api/claude.ts:1940for await (const part of stream) 流解析
src/services/api/claude.ts:2404流式错误处理 + 非流式降级
src/services/api/withRetry.ts:170withRetry 重试生成器
src/services/tools/toolOrchestration.ts:19runTools 工具编排
src/query/stopHooks.ts:65handleStopHooks 停止钩子
src/query/tokenBudget.ts:45checkTokenBudget token 预算检查
src/query/config.ts:29buildQueryConfig 配置快照
src/query/deps.ts:33productionDeps 依赖注入工厂
src/cost-tracker.ts:49成本/用量导出
src/context.ts:116getSystemContext 系统上下文
src/context.ts:155getUserContext 用户上下文

依赖注入与可测试性

QueryDeps (src/query/deps.ts:21-31)

query.ts 的 I/O 依赖通过 deps 参数注入,方便测试 mock:

export type QueryDeps = {
  callModel: typeof queryModelWithStreaming  // API 调用
  microcompact: typeof microcompactMessages  // 微压缩
  autocompact: typeof autoCompactIfNeeded    // 自动压缩
  uuid: () => string                         // UUID 生成
}

生产环境使用 productionDeps() 返回真实实现,测试可以注入 fakes。

QueryConfig (src/query/config.ts:15-27)

不可变配置快照,查询开始时捕获一次:

export type QueryConfig = {
  sessionId: SessionId
  gates: {
    streamingToolExecution: boolean  // 流式工具执行
    emitToolUseSummaries: boolean    // 工具使用摘要
    isAnt: boolean                   // ant 用户
    fastModeEnabled: boolean         // 快速模式
  }
}

Feature gates 故意不包含在此快照中——它们是 tree-shaking 边界,必须 inline 在各使用点以支持 dead code elimination。