第5章:查询引擎
本章深入分析 Claude Code 的查询引擎实现,揭示消息处理流程、流式响应处理和计费追踪机制。
5.1 QueryEngine.ts 核心代码解析
QueryEngine 是 Claude Code 的核心组件,负责管理整个对话生命周期和消息处理循环。
类结构定义
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[]
private abortController: AbortController
private permissionDenials: SDKPermissionDenial[]
private totalUsage: NonNullableUsage
private hasHandledOrphanedPermission = false
private readFileState: FileStateCache
private discoveredSkillNames = new Set<string>()
private loadedNestedMemoryPaths = new Set<string>()
constructor(config: QueryEngineConfig) {
this.config = config
this.mutableMessages = config.initialMessages ?? []
this.abortController = config.abortController ?? createAbortController()
this.permissionDenials = []
this.readFileState = config.readFileCache
this.totalUsage = EMPTY_USAGE
}
}
QueryEngine 采用「One QueryEngine per conversation」的设计原则,每个对话实例独立管理状态,确保消息历史、文件缓存、使用量统计等数据在多轮交互中正确持久化。
配置参数结构
export type QueryEngineConfig = {
cwd: string // 工作目录
tools: Tools // 可用工具列表
commands: Command[] // 斜杠命令列表
mcpClients: MCPServerConnection[] // MCP 服务器连接
agents: AgentDefinition[] // Agent 定义列表
canUseTool: CanUseToolFn // 工具权限检查函数
getAppState: () => AppState // 状态获取函数
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[] // 初始消息列表
readFileCache: FileStateCache // 文件读取缓存
customSystemPrompt?: string // 自定义系统提示
appendSystemPrompt?: string // 附加系统提示
userSpecifiedModel?: string // 用户指定模型
thinkingConfig?: ThinkingConfig // Thinking 配置
maxTurns?: number // 最大轮次
maxBudgetUsd?: number // 最大预算
jsonSchema?: Record<string, unknown> // 结构化输出 Schema
}
核心方法:submitMessage
submitMessage 是 QueryEngine 的核心方法,采用异步生成器模式处理消息流:
async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown> {
// 1. 初始化配置
this.discoveredSkillNames.clear()
setCwd(cwd)
// 2. 包装权限检查函数
const wrappedCanUseTool = async (tool, input, context, ...) => {
const result = await canUseTool(...)
if (result.behavior !== 'allow') {
this.permissionDenials.push({
tool_name: sdkCompatToolName(tool.name),
tool_use_id: toolUseID,
tool_input: input,
})
}
return result
}
// 3. 获取系统提示组件
const { defaultSystemPrompt, userContext, systemContext } =
await fetchSystemPromptParts({ tools, mainLoopModel, ... })
// 4. 处理用户输入
const { messagesFromUserInput, shouldQuery, ... } =
await processUserInput({ input: prompt, mode: 'prompt', ... })
// 5. 进入查询循环
for await (const message of query({ messages, systemPrompt, ... })) {
// 处理各类消息:assistant、user、stream_event、attachment...
yield* normalizeMessage(message)
}
// 6. 返回最终结果
yield { type: 'result', subtype: 'success', ... }
}
5.2 消息处理流程
完整消息处理时序图
sequenceDiagram
participant User as 用户
participant REPL as REPL.tsx
participant QE as QueryEngine
participant PU as processUserInput
participant API as API Service
participant Query as query()
participant Tool as Tool System
User->>REPL: 输入 Prompt
REPL->>QE: submitMessage(prompt)
QE->>QE: 初始化状态
QE->>PU: processUserInput()
PU->>PU: 解析斜杠命令
PU->>PU: 处理附件
PU-->>QE: 返回消息列表
QE->>QE: 持久化用户消息
QE->>QE: 构建 ProcessUserInputContext
QE->>QE: 获取系统提示
QE->>Query: query(messages, systemPrompt)
Query->>API: 发送 API 请求
API-->>Query: 流式响应
loop 消息循环
Query->>Query: 解析 content_block
alt tool_use 块
Query->>Tool: runTools()
Tool->>Tool: 权限检查
Tool->>Tool: 执行工具
Tool-->>Query: tool_result
else text 块
Query->>QE: yield assistant message
end
end
Query-->>QE: 完成
QE->>QE: 计算 token 使用量
QE-->>REPL: yield result
REPL->>User: 显示结果
消息类型分类
| 类型 | 说明 | 处理方式 |
|---|---|---|
assistant | AI 响应消息 | 添加到消息列表,yield 给调用者 |
user | 用户消息 | 添加到消息列表,turnCount++ |
stream_event | 流事件 | 更新 usage 统计 |
attachment | 附件消息 | 添加到消息列表 |
progress | 进度消息 | 实时 yield |
system | 系统消息 | 处理 compact_boundary |
tombstone | 移除标记 | 跳过处理 |
消息流转状态图
stateDiagram-v2
[*] --> UserInput: 用户输入
UserInput --> ProcessInput: 处理输入
ProcessInput --> BuildContext: 构建上下文
BuildContext --> QueryLoop: 进入查询循环
QueryLoop --> APICall: 发送 API 请求
APICall --> StreamResponse: 流式响应
StreamResponse --> ParseBlock: 解析 content_block
ParseBlock --> ToolUse: tool_use 块
ParseBlock --> TextBlock: text 块
ToolUse --> PermissionCheck: 权限检查
PermissionCheck --> Execute: 允许执行
PermissionCheck --> Deny: 拒绝执行
Execute --> ToolResult: 返回结果
Deny --> ToolResult: 返回错误
ToolResult --> QueryLoop: 继续循环
TextBlock --> YieldMessage: yield 消息
YieldMessage --> QueryLoop: 继续循环
QueryLoop --> EndTurn: 结束条件
EndTurn --> CalculateCost: 计算费用
CalculateCost --> [*]: 返回结果
5.3 流式响应处理
SSE (Server-Sent Events) 处理
Claude Code 使用 Anthropic Messages API 的流式响应模式,通过 SSE 处理实时数据。
graph TD
subgraph "流式响应处理"
API[API Request] --> STREAM[Stream Connection]
STREAM --> EVENT1[event: message_start]
EVENT1 --> EVENT2[event: content_block_start]
EVENT2 --> EVENT3[event: content_block_delta]
EVENT3 --> EVENT4[event: content_block_stop]
EVENT4 --> EVENT5[event: message_delta]
EVENT5 --> EVENT6[event: message_stop]
end
subgraph "事件处理"
EVENT1 --> USAGE[初始化 Usage]
EVENT2 --> BLOCK[开始 Content Block]
EVENT3 --> ACCUM[累积内容]
EVENT5 --> FINAL[最终 Usage]
end
流事件类型定义
type StreamEvent =
| { type: 'message_start'; message: { usage: Usage } }
| { type: 'content_block_start'; index: number; content_block: ContentBlock }
| { type: 'content_block_delta'; index: number; delta: Delta }
| { type: 'content_block_stop'; index: number }
| { type: 'message_delta'; delta: { stop_reason: string }; usage: Usage }
| { type: 'message_stop' }
Usage 统计追踪
// 在 QueryEngine 中追踪 usage
if (message.event.type === 'message_start') {
currentMessageUsage = EMPTY_USAGE
currentMessageUsage = updateUsage(currentMessageUsage, message.event.message.usage)
}
if (message.event.type === 'message_delta') {
currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
if (message.event.delta.stop_reason != null) {
lastStopReason = message.event.delta.stop_reason
}
}
if (message.event.type === 'message_stop') {
this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage)
}
Token 计费追踪(cost-tracker.ts)
// cost-tracker.ts 核心函数
export function addToTotalSessionCost(
cost: number,
usage: Usage,
model: string,
): number {
const modelUsage = addToTotalModelUsage(cost, usage, model)
addToTotalCostState(cost, modelUsage, model)
// 更新计数器
getCostCounter()?.add(cost, { model })
getTokenCounter()?.add(usage.input_tokens, { model, type: 'input' })
getTokenCounter()?.add(usage.output_tokens, { model, type: 'output' })
getTokenCounter()?.add(usage.cache_read_input_tokens ?? 0, { model, type: 'cacheRead' })
getTokenCounter()?.add(usage.cache_creation_input_tokens ?? 0, { model, type: 'cacheCreation' })
return totalCost
}
// 模型使用统计
function addToTotalModelUsage(cost: number, usage: Usage, model: string): ModelUsage {
const modelUsage = getUsageForModel(model) ?? {
inputTokens: 0,
outputTokens: 0,
cacheReadInputTokens: 0,
cacheCreationInputTokens: 0,
webSearchRequests: 0,
costUSD: 0,
contextWindow: 0,
maxOutputTokens: 0,
}
modelUsage.inputTokens += usage.input_tokens
modelUsage.outputTokens += usage.output_tokens
modelUsage.cacheReadInputTokens += usage.cache_read_input_tokens ?? 0
modelUsage.cacheCreationInputTokens += usage.cache_creation_input_tokens ?? 0
modelUsage.webSearchRequests += usage.server_tool_use?.web_search_requests ?? 0
modelUsage.costUSD += cost
return modelUsage
}
总费用 = 输入 tokens 费用 + 输出 tokens 费用 + 缓存读取费用 + 缓存创建费用 + 网络搜索费用。缓存读取费用约为输入费用的 10%,缓存创建费用约为输入费用的 25%。
费用追踪流程图
flowchart LR
subgraph "API 响应"
RESP[API Response] --> USAGE[Usage 数据]
end
subgraph "费用计算"
USAGE --> CALC[calculateUSDCost]
CALC --> COST[费用金额]
end
subgraph "状态更新"
COST --> STATE[addToTotalCostState]
STATE --> COUNTER[Counter 更新]
COUNTER --> CONFIG[保存到 Config]
end
subgraph "展示"
STATE --> FORMAT[formatTotalCost]
FORMAT --> DISPLAY[终端输出]
end
5.4 query.ts 辅助函数
query.ts 是查询循环的核心实现,处理工具调用、上下文压缩、错误恢复等逻辑。
查询循环状态管理
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
transition: Continue | undefined
}
主查询函数结构
export async function* query(params: QueryParams): AsyncGenerator<...> {
let state: State = {
messages: params.messages,
toolUseContext: params.toolUseContext,
maxOutputTokensOverride: params.maxOutputTokensOverride,
autoCompactTracking: undefined,
turnCount: 1,
// ...
}
// 主循环
while (true) {
yield { type: 'stream_request_start' }
// 1. 上下文处理
let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)]
messagesForQuery = await applyToolResultBudget(messagesForQuery, ...)
// 2. Snip 处理
if (feature('HISTORY_SNIP')) {
const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
messagesForQuery = snipResult.messages
}
// 3. Microcompact
const microcompactResult = await deps.microcompact(messagesForQuery, ...)
// 4. Autocompact
const { compactionResult } = await deps.autocompact(messagesForQuery, ...)
// 5. API 调用
for await (const message of deps.callModel({ messages, systemPrompt, ... })) {
// 处理流式消息
if (message.type === 'assistant') {
assistantMessages.push(message)
// 检测 tool_use 块
const toolUseBlocks = message.message.content.filter(c => c.type === 'tool_use')
if (toolUseBlocks.length > 0) {
needsFollowUp = true
}
}
}
// 6. 工具执行
if (needsFollowUp) {
const toolUpdates = streamingToolExecutor?.getRemainingResults()
?? runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) yield update.message
}
}
// 7. 继续循环或结束
if (!needsFollowUp) {
return { reason: 'completed' }
}
state = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], ... }
}
}
错误恢复机制
graph TD
subgraph "错误类型"
E1[Prompt Too Long 413]
E2[Max Output Tokens]
E3[Image Size Error]
E4[API Rate Limit]
end
subgraph "恢复策略"
R1[Reactive Compact]
R2[Token Escalation]
R3[Media Strip Retry]
R4[Wait and Retry]
end
subgraph "结果"
S1[成功恢复]
S2[失败返回错误]
end
E1 --> R1 --> S1
E1 --> R1 --> S2
E2 --> R2 --> S1
E3 --> R3 --> S1
E4 --> R4 --> S1
上下文压缩触发条件
| 触发条件 | 压缩方式 | 说明 |
|---|---|---|
| Token 超过阈值 | Autocompact | 自动调用摘要 API |
| Snip 标记 | Snip Compact | 移除标记的冗余内容 |
| 请求级别 | Microcompact | 缓存编辑优化 |
| 413 错误 | Reactive Compact | 紧急压缩恢复 |
flowchart TB
subgraph "压缩检查顺序"
CHECK[检查 Token 数量] --> SNIP{需要 Snip?}
SNIP --> |"是"| SNIP_EXEC[执行 Snip]
SNIP --> |"否"| MICRO{需要 Microcompact?}
SNIP_EXEC --> MICRO
MICRO --> |"是"| MICRO_EXEC[执行 Microcompact]
MICRO --> |"否"| AUTO{需要 Autocompact?}
MICRO_EXEC --> AUTO
AUTO --> |"是"| AUTO_EXEC[执行 Autocompact]
AUTO --> |"否"| API[发送 API 请求]
AUTO_EXEC --> API
end
下一章将深入分析 工具系统 的实现细节。