Claude Code QueryEngine 深度分析
架构概览
QueryEngine 是 Claude Code 的核心 LLM 交互引擎,负责管理整个对话生命周期——从用户输入到 API 调用、流式响应处理、工具执行循环,再到最终结果返回。
整体架构分为三层:
┌─────────────────────────────────────────────┐
│ QueryEngine (会话管理) │
│ src/QueryEngine.ts │
│ - submitMessage() → AsyncGenerator │
│ - 消息状态、成本追踪、会话持久化 │
├─────────────────────────────────────────────┤
│ query() (核心循环) │
│ src/query.ts │
│ - while(true) agentic loop │
│ - 上下文压缩、token 管理 │
│ - 工具调用编排 │
├─────────────────────────────────────────────┤
│ queryModelWithStreaming │
│ src/services/api/claude.ts │
│ - API 请求构建 │
│ - SSE 流式解析 │
│ - 重试/降级逻辑 │
├─────────────────────────────────────────────┤
│ withRetry + withStreamingVCR │
│ src/services/api/withRetry.ts │
│ src/services/vcr.ts │
│ - 指数退避重试 │
│ - 模型 fallback (529 过载) │
│ - VCR 录制/回放 │
└─────────────────────────────────────────────┘
QueryEngine 类 (src/QueryEngine.ts)
设计理念
QueryEngine 是一个有状态的会话对象:一个 QueryEngine 实例对应一个会话,每个 submitMessage() 调用在一个会话内开启新的轮次。消息历史、文件缓存、token 使用量等状态跨轮次持久化。
// src/QueryEngine.ts:184-207
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[] // 消息历史(跨轮次持久化)
private abortController: AbortController // 中断控制
private permissionDenials: SDKPermissionDenial[] // 权限拒绝记录
private totalUsage: NonNullableUsage // 累积 token 用量
private readFileState: FileStateCache // 文件读取缓存
private discoveredSkillNames = new Set<string>() // 技能发现追踪
private loadedNestedMemoryPaths = new Set<string>() // 嵌套 memory 路径
}
QueryEngineConfig (src/QueryEngine.ts:130-173)
export type QueryEngineConfig = {
cwd: string // 工作目录
tools: Tools // 可用工具列表
commands: Command[] // 斜杠命令
mcpClients: MCPServerConnection[] // MCP 服务器连接
agents: AgentDefinition[] // 子 agent 定义
canUseTool: CanUseToolFn // 工具权限判断函数
getAppState: () => AppState // 获取应用状态
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[] // 初始消息(会话恢复)
readFileCache: FileStateCache // 文件状态缓存
customSystemPrompt?: string // 自定义系统提示词
appendSystemPrompt?: string // 追加系统提示词
userSpecifiedModel?: string // 用户指定模型
fallbackModel?: string // 降级模型
thinkingConfig?: ThinkingConfig // Thinking 配置
maxTurns?: number // 最大轮次数
maxBudgetUsd?: number // 最大 USD 预算
taskBudget?: { total: number } // API 侧 token 预算
jsonSchema?: Record<string, unknown> // 结构化输出 schema
verbose?: boolean // 详细输出
replayUserMessages?: boolean // SDK 模式下重放用户消息
includePartialMessages?: bool // 是否包含流式中间事件
abortController?: AbortController // 外部中断控制
snipReplay?: (...) => ... // snip 压缩回放处理器
}
便捷入口 ask() (src/QueryEngine.ts:1186-1295)
ask() 是 QueryEngine 的单次调用便捷封装,内部创建 QueryEngine 实例并调用 submitMessage():
// src/QueryEngine.ts:1249-1285
const engine = new QueryEngine({ /* config */ })
yield* engine.submitMessage(prompt, { uuid: promptUuid, isMeta })
核心循环:query() 与 queryLoop()
循环结构 (src/query.ts:219-1729)
query() 是核心 agentic 循环的入口。内部调用 queryLoop(),采用 while(true) + State 模式实现跨迭代状态传递:
State 类型 (src/query.ts:204-217)
循环跨迭代的可变状态:
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
transition: Continue | undefined // 上一次迭代的继续原因
}
继续原因 (Transition reasons)
transition 字段记录循环继续的原因,供后续迭代和测试断言使用:
next_turn— 正常工具调用后的下一轮reactive_compact_retry— 413 后响应式压缩重试collapse_drain_retry— 上下文折叠排水重试max_output_tokens_recovery— 输出 token 截断恢复max_output_tokens_escalate— 输出 token 升级到 64kstop_hook_blocking— stop hook 产生 blocking error 后重试token_budget_continuation— token 预算未耗尽,自动继续
API 调用构造
queryModelWithStreaming (src/services/api/claude.ts:752-780)
对外暴露的流式 API 调用函数:
export async function* queryModelWithStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options
}): AsyncGenerator<StreamEvent | AssistantMessage | SystemAPIErrorMessage> {
return yield* withStreamingVCR(messages, async function* () {
yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
})
}
queryModel (src/services/api/claude.ts:1017+)
核心 API 请求构造函数,职责包括:
- 工具 Schema 构建 (
src/services/api/claude.ts:1235-1246):将工具定义转换为 API schema,支持defer_loading动态工具加载 - 消息规范化 (
src/services/api/claude.ts:1266):normalizeMessagesForAPI处理 tool_use/tool_result 配对、去除多余字段 - 系统提示词构建 (
src/services/api/claude.ts:1358-1369):组装 CLI 归属头、前缀、advisor 指令等 - Prompt Cache 断点 (
src/services/api/claude.ts:1701-1709):addCacheBreakpoints在消息中插入cache_control标记 - Thinking 配置 (
src/services/api/claude.ts:1596-1630):支持 adaptive thinking 和 budget-based thinking - Beta 头管理 (
src/services/api/claude.ts:1071+):动态 beta header,sticky-on latch 机制避免 cache 破坏
请求参数结构 (src/services/api/claude.ts:1699-1728)
最终发送给 API 的参数:
return {
model: normalizeModelStringForAPI(options.model),
messages: addCacheBreakpoints(messagesForAPI, ...),
system, // 系统提示词 (带 cache_control)
tools: allTools, // 工具定义数组
tool_choice: options.toolChoice,
betas: betasParams, // beta headers
metadata: getAPIMetadata(), // user_id, device_id, session_id
max_tokens: maxOutputTokens,
thinking, // { type: 'adaptive' } 或 { budget_tokens, type: 'enabled' }
temperature, // 仅 thinking 禁用时发送
context_management, // API 侧上下文管理策略
output_config: { effort, task_budget, format },
speed, // fast mode
...extraBodyParams, // Bedrock/Vertex 额外参数
}
流式处理机制
流式事件处理 (src/services/api/claude.ts:1940-2304)
for await (const part of stream) 循环处理 Anthropic SSE 流:
switch (part.type) {
case 'message_start':
// 初始化 partialMessage, 记录 ttftMs, 更新 usage
case 'content_block_start':
// 按 block 类型初始化: text/tool_use/thinking/server_tool_use
// tool_use: contentBlocks[i] = { ...block, input: '' }
case 'content_block_delta':
// text_delta: 累加 text
// input_json_delta: 累加 input JSON 字符串
// thinking_signature_delta: 累加 signature
case 'content_block_stop':
// 构建 AssistantMessage, yield 给调用方
// 解析 tool_use input JSON
case 'message_delta':
// 更新 usage, stop_reason
// 写回已 yield 的 message 对象(直接属性修改)
// 计算 USD 成本
case 'message_stop':
break
}
每次事件后都 yield { type: 'stream_event', event: part } 以传递中间事件。
流式监控
- 空闲超时看门狗 (
src/services/api/claude.ts:1874-1928):STREAM_IDLE_TIMEOUT_MS(默认 90s),无数据到达时终止流 - 卡顿检测 (
src/services/api/claude.ts:1936-1966):30s 无事件视为卡顿,记录日志 - 非流式降级 (
src/services/api/claude.ts:2469-2594):流式失败时回退到非流式请求executeNonStreamingRequest
StreamingToolExecutor (src/query.ts:562-568)
启用 tengu_streaming_tool_execution2 feature gate 时,在流式接收过程中并行执行工具:
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(toolUseContext.options.tools, canUseTool, toolUseContext)
: null
// 流中:每收到 tool_use block 立即加入执行队列
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message)
}
// 流中:获取已完成的结果
for (const result of streamingToolExecutor.getCompletedResults()) { ... }
// 流结束后:获取剩余结果
for (const update of streamingToolExecutor.getRemainingResults()) { ... }
工具调用循环(Agentic Loop)
非流式工具执行 (src/services/tools/toolOrchestration.ts:19-80)
当未启用 streaming tool execution 时,使用 runTools() 执行工具:
关键点:
- 读工具并发:如 Read、Grep、Glob 等只读操作并发执行
- 写工具串行:Write、Edit 等修改操作必须串行
- 默认最大并发度为 10 (
CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY)
工具结果收集 (src/query.ts:1380-1408)
工具执行结果通过 normalizeMessagesForAPI 转换为 API 格式,推入 toolResults 数组:
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) {
yield update.message
toolResults.push(...normalizeMessagesForAPI([update.message], tools))
}
if (update.newContext) {
updatedToolUseContext = { ...update.newContext, queryTracking }
}
}
循环继续/终止 (src/query.ts:1678-1728)
每次迭代末尾组装下一轮 State:
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
toolUseContext: toolUseContextWithQueryTracking,
autoCompactTracking: tracking,
turnCount: nextTurnCount,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
pendingToolUseSummary: nextPendingToolUseSummary,
maxOutputTokensOverride: undefined,
stopHookActive,
transition: { reason: 'next_turn' },
}
state = next // while(true) 继续
终止条件检查在循环前部(无工具调用时):
- Stop hooks (
src/query/stopHooks.ts:65-472):用户定义的停止钩子,可阻止继续 - Token budget (
src/query/tokenBudget.ts:45-93):检测 token 使用量是否达到预算阈值 - maxTurns (
src/query.ts:1705-1712):达到最大轮次限制 - Abort (
src/query.ts:1015-1051):用户中断
重试与错误处理
withRetry (src/services/api/withRetry.ts:170+)
重试生成器封装所有 API 调用:
export async function* withRetry<T>(
getClient: () => Promise<Anthropic>,
operation: (client, attempt, context) => Promise<T>,
options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T>
重试策略:
| 错误类型 | 处理 |
|---|---|
| 429 / 529 (过载) | 等待 retry-after 或指数退避;连续 3 次 529 触发模型 fallback |
| 401 (认证失败) | 刷新 OAuth token,重建 client |
| 403 (token 吊销) | 同 401 |
| ECONNRESET / EPIPE | 禁用 keep-alive,重建连接 |
| Fast mode 429/529 | 短等待重试(<2s)或进入 fast mode cooldown |
| 其他 | 通用重试逻辑 |
模型 Fallback (src/services/api/withRetry.ts:327-349)
当连续 529 过载错误达到阈值时触发:
if (consecutive529Errors >= MAX_529_RETRIES) {
if (options.fallbackModel) {
throw new FallbackTriggeredError(options.model, options.fallbackModel)
}
}
FallbackTriggeredError 在 query.ts:894-951 中被捕获,切换到 fallbackModel 后重试整个请求:
if (innerError instanceof FallbackTriggeredError && fallbackModel) {
currentModel = fallbackModel
attemptWithFallback = true
// 清空 assistant messages, 重新发起请求
assistantMessages.length = 0
toolResults.length = 0
toolUseBlocks.length = 0
}
输出 Token 截断恢复 (src/query.ts:1188-1256)
当 stop_reason === 'max_tokens' 时,进行升级重试:
- 首次:若启用
tengu_otk_slot_v1,升级到ESCALATED_MAX_TOKENS(64k)重试 - 后续:注入
isMeta用户消息,要求模型从截断处继续,最多 3 次 - 耗尽:返回错误
Prompt Too Long 恢复 (src/query.ts:1070-1183)
413 错误的三级恢复策略:
Token/成本管理
cost-tracker.ts (src/cost-tracker.ts)
全局成本追踪模块,管理以下状态(存储在 bootstrap/state 中):
type StoredCostState = {
totalCostUSD: number // 总 USD 费用
totalAPIDuration: number // 总 API 耗时
totalAPIDurationWithoutRetries: number // 不含重试的 API 耗时
totalToolDuration: number // 总工具执行耗时
totalLinesAdded: number // 总添加行数
totalLinesRemoved: number // 总删除行数
lastDuration: number | undefined // 最近一次耗时
modelUsage: { [modelName: string]: ModelUsage } // 按模型分类的用量
}
Usage 累积 (src/services/api/claude.ts:2924-3025)
updateUsage:流式事件中逐块更新单消息的 token 用量。注意 input_tokens 仅在 >0 时更新(message_start 设置初始值,后续 delta 不覆盖)。
accumulateUsage:将单消息用量累积到总用量。逐字段加和 input_tokens、output_tokens、cache_read_input_tokens、cache_creation_input_tokens 等。
// src/services/api/claude.ts:2993-3005
export function accumulateUsage(totalUsage, messageUsage): NonNullableUsage {
return {
input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
cache_creation_input_tokens: totalUsage.cache_creation_input_tokens + messageUsage.cache_creation_input_tokens,
cache_read_input_tokens: totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
// ...
}
}
成本计算 (src/services/api/claude.ts:2251-2256)
在 message_delta 事件中实时计算成本:
const costUSDForPart = calculateUSDCost(resolvedModel, usage)
costUSD += addToTotalSessionCost(costUSDForPart, usage, options.model)
Token Budget (src/query/tokenBudget.ts)
独立于 USD budget 的 token budget 机制,控制单次 agentic turn 的 token 消耗:
// src/query/tokenBudget.ts:45-93
export function checkTokenBudget(tracker, agentId, budget, globalTurnTokens) {
// 90% 阈值:未达到则注入 nudge 消息鼓励继续
// diminishing returns:连续 3 次且增量 <500 token 则提前停止
// 返回 continue 或 stop 决策
}
会话持久化 (src/cost-tracker.ts:87-175)
成本状态跨会话持久化:
getStoredSessionCosts():从项目配置读取上次会话成本restoreCostStateForSession():恢复会话时加载历史成本saveCurrentSessionCosts():切换会话前保存当前成本
上下文管理
getSystemContext (src/context.ts:116-150)
系统级上下文,全会话缓存(memoize):
export const getSystemContext = memoize(async () => {
return {
gitStatus, // git 分支、状态、最近提交
cacheBreaker, // 缓存破坏标记(ant-only)
}
})
getUserContext (src/context.ts:155-189)
用户级上下文,全会话缓存:
export const getUserContext = memoize(async () => {
return {
claudeMd, // CLAUDE.md 文件内容
currentDate, // 当前日期
}
})
上下文组装流程 (src/QueryEngine.ts:286-325)
// 1. 获取系统提示词各部分
const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
tools, mainLoopModel, additionalWorkingDirectories, mcpClients, customSystemPrompt
})
// 2. 组装最终系统提示词
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
])
上下文压缩
在 query() 每次迭代前执行多级压缩(src/query.ts:365-447):
- Tool result budget:限制单条工具结果大小,超限内容替换为摘要
- Snip:截断历史尾部,释放 token
- Microcompact:压缩冗余工具调用(如多轮 Read 同一文件)
- Context Collapse:折叠历史段为摘要
- Auto Compact:当 token 超限时触发全量对话压缩
消息格式与会话模型
Message 类型
type Message =
| UserMessage // { type: 'user', message: { role: 'user', content }, uuid, timestamp }
| AssistantMessage // { type: 'assistant', message: { role: 'assistant', content, usage, stop_reason }, uuid }
| SystemMessage // { type: 'system', subtype: '...', content }
| StreamEvent // { type: 'stream_event', event: RawStreamEvent }
| AttachmentMessage // { type: 'attachment', attachment: {...} }
| ProgressMessage // { type: 'progress', toolUseID, data }
| TombstoneMessage // { type: 'tombstone', message: Message } — UI 删除标记
| ToolUseSummaryMessage // { type: 'tool_use_summary', summary, precedingToolUseIds }
会话模型
一个对话是 Message 数组的线性序列:
UserMessage → [meta messages] → AssistantMessage(s) [含 tool_use blocks] →
UserMessage(tool_results) → AssistantMessage(s) → ...
QueryEngine 维护 mutableMessages 数组作为会话的唯一真相来源。每次迭代:
- 追加 assistant messages 和 tool results
- 追加 attachments(memory、file changes、commands)
- 通过
recordTranscript()持久化到本地存储
关键代码路径
| 位置 | 功能 |
|---|---|
src/QueryEngine.ts:184 | QueryEngine 类定义 |
src/QueryEngine.ts:209 | submitMessage() 入口 |
src/QueryEngine.ts:675 | 进入 query() 循环 |
src/QueryEngine.ts:757 | 消息类型分发 switch |
src/QueryEngine.ts:972 | USD budget 超限检查 |
src/query.ts:219 | query() 入口 |
src/query.ts:241 | queryLoop() agentic 循环 |
src/query.ts:307 | while(true) 主循环 |
src/query.ts:365-447 | 上下文压缩流水线 |
src/query.ts:654-708 | 调用 queryModelWithStreaming |
src/query.ts:829-844 | 收集 tool_use blocks |
src/query.ts:1062 | 无工具调用时的停止处理 |
src/query.ts:1085-1183 | Prompt Too Long 三级恢复 |
src/query.ts:1188-1256 | max_output_tokens 升级恢复 |
src/query.ts:1267 | handleStopHooks |
src/query.ts:1380-1408 | 工具执行(streaming 或 runTools) |
src/query.ts:1580 | 收集 attachments |
src/query.ts:1714 | 继续循环:组装下一轮 State |
src/services/api/claude.ts:752 | queryModelWithStreaming 入口 |
src/services/api/claude.ts:1017 | queryModel API 请求构造 |
src/services/api/claude.ts:1538 | paramsFromContext 请求参数组装 |
src/services/api/claude.ts:1778 | withRetry 包装 |
src/services/api/claude.ts:1822 | anthropic.beta.messages.create 调用 |
src/services/api/claude.ts:1940 | for await (const part of stream) 流解析 |
src/services/api/claude.ts:2404 | 流式错误处理 + 非流式降级 |
src/services/api/withRetry.ts:170 | withRetry 重试生成器 |
src/services/tools/toolOrchestration.ts:19 | runTools 工具编排 |
src/query/stopHooks.ts:65 | handleStopHooks 停止钩子 |
src/query/tokenBudget.ts:45 | checkTokenBudget token 预算检查 |
src/query/config.ts:29 | buildQueryConfig 配置快照 |
src/query/deps.ts:33 | productionDeps 依赖注入工厂 |
src/cost-tracker.ts:49 | 成本/用量导出 |
src/context.ts:116 | getSystemContext 系统上下文 |
src/context.ts:155 | getUserContext 用户上下文 |
依赖注入与可测试性
QueryDeps (src/query/deps.ts:21-31)
query.ts 的 I/O 依赖通过 deps 参数注入,方便测试 mock:
export type QueryDeps = {
callModel: typeof queryModelWithStreaming // API 调用
microcompact: typeof microcompactMessages // 微压缩
autocompact: typeof autoCompactIfNeeded // 自动压缩
uuid: () => string // UUID 生成
}
生产环境使用 productionDeps() 返回真实实现,测试可以注入 fakes。
QueryConfig (src/query/config.ts:15-27)
不可变配置快照,查询开始时捕获一次:
export type QueryConfig = {
sessionId: SessionId
gates: {
streamingToolExecution: boolean // 流式工具执行
emitToolUseSummaries: boolean // 工具使用摘要
isAnt: boolean // ant 用户
fastModeEnabled: boolean // 快速模式
}
}
Feature gates 故意不包含在此快照中——它们是 tree-shaking 边界,必须 inline 在各使用点以支持 dead code elimination。