Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第5章:查询引擎

本章深入分析 Claude Code 的查询引擎实现,揭示消息处理流程、流式响应处理和计费追踪机制。

5.1 QueryEngine.ts 核心代码解析

QueryEngine 是 Claude Code 的核心组件,负责管理整个对话生命周期和消息处理循环。

类结构定义

export class QueryEngine {
  private config: QueryEngineConfig
  private mutableMessages: Message[]
  private abortController: AbortController
  private permissionDenials: SDKPermissionDenial[]
  private totalUsage: NonNullableUsage
  private hasHandledOrphanedPermission = false
  private readFileState: FileStateCache
  private discoveredSkillNames = new Set<string>()
  private loadedNestedMemoryPaths = new Set<string>()

  constructor(config: QueryEngineConfig) {
    this.config = config
    this.mutableMessages = config.initialMessages ?? []
    this.abortController = config.abortController ?? createAbortController()
    this.permissionDenials = []
    this.readFileState = config.readFileCache
    this.totalUsage = EMPTY_USAGE
  }
}

设计理念

QueryEngine 采用「One QueryEngine per conversation」的设计原则,每个对话实例独立管理状态,确保消息历史、文件缓存、使用量统计等数据在多轮交互中正确持久化。

配置参数结构

export type QueryEngineConfig = {
  cwd: string                        // 工作目录
  tools: Tools                       // 可用工具列表
  commands: Command[]                // 斜杠命令列表
  mcpClients: MCPServerConnection[]  // MCP 服务器连接
  agents: AgentDefinition[]          // Agent 定义列表
  canUseTool: CanUseToolFn           // 工具权限检查函数
  getAppState: () => AppState        // 状态获取函数
  setAppState: (f: (prev: AppState) => AppState) => void
  initialMessages?: Message[]        // 初始消息列表
  readFileCache: FileStateCache      // 文件读取缓存
  customSystemPrompt?: string        // 自定义系统提示
  appendSystemPrompt?: string        // 附加系统提示
  userSpecifiedModel?: string        // 用户指定模型
  thinkingConfig?: ThinkingConfig    // Thinking 配置
  maxTurns?: number                  // 最大轮次
  maxBudgetUsd?: number              // 最大预算
  jsonSchema?: Record<string, unknown> // 结构化输出 Schema
}

核心方法:submitMessage

submitMessage 是 QueryEngine 的核心方法,采用异步生成器模式处理消息流:

async *submitMessage(
  prompt: string | ContentBlockParam[],
  options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown> {
  // 1. 初始化配置
  this.discoveredSkillNames.clear()
  setCwd(cwd)
  
  // 2. 包装权限检查函数
  const wrappedCanUseTool = async (tool, input, context, ...) => {
    const result = await canUseTool(...)
    if (result.behavior !== 'allow') {
      this.permissionDenials.push({
        tool_name: sdkCompatToolName(tool.name),
        tool_use_id: toolUseID,
        tool_input: input,
      })
    }
    return result
  }
  
  // 3. 获取系统提示组件
  const { defaultSystemPrompt, userContext, systemContext } = 
    await fetchSystemPromptParts({ tools, mainLoopModel, ... })
  
  // 4. 处理用户输入
  const { messagesFromUserInput, shouldQuery, ... } = 
    await processUserInput({ input: prompt, mode: 'prompt', ... })
  
  // 5. 进入查询循环
  for await (const message of query({ messages, systemPrompt, ... })) {
    // 处理各类消息:assistant、user、stream_event、attachment...
    yield* normalizeMessage(message)
  }
  
  // 6. 返回最终结果
  yield { type: 'result', subtype: 'success', ... }
}

5.2 消息处理流程

完整消息处理时序图

sequenceDiagram
    participant User as 用户
    participant REPL as REPL.tsx
    participant QE as QueryEngine
    participant PU as processUserInput
    participant API as API Service
    participant Query as query()
    participant Tool as Tool System
    
    User->>REPL: 输入 Prompt
    REPL->>QE: submitMessage(prompt)
    QE->>QE: 初始化状态
    QE->>PU: processUserInput()
    PU->>PU: 解析斜杠命令
    PU->>PU: 处理附件
    PU-->>QE: 返回消息列表
    
    QE->>QE: 持久化用户消息
    QE->>QE: 构建 ProcessUserInputContext
    QE->>QE: 获取系统提示
    
    QE->>Query: query(messages, systemPrompt)
    Query->>API: 发送 API 请求
    API-->>Query: 流式响应
    
    loop 消息循环
        Query->>Query: 解析 content_block
        alt tool_use 块
            Query->>Tool: runTools()
            Tool->>Tool: 权限检查
            Tool->>Tool: 执行工具
            Tool-->>Query: tool_result
        else text 块
            Query->>QE: yield assistant message
        end
    end
    
    Query-->>QE: 完成
    QE->>QE: 计算 token 使用量
    QE-->>REPL: yield result
    REPL->>User: 显示结果

消息类型分类

类型说明处理方式
assistantAI 响应消息添加到消息列表,yield 给调用者
user用户消息添加到消息列表,turnCount++
stream_event流事件更新 usage 统计
attachment附件消息添加到消息列表
progress进度消息实时 yield
system系统消息处理 compact_boundary
tombstone移除标记跳过处理

消息流转状态图

stateDiagram-v2
    [*] --> UserInput: 用户输入
    UserInput --> ProcessInput: 处理输入
    ProcessInput --> BuildContext: 构建上下文
    
    BuildContext --> QueryLoop: 进入查询循环
    QueryLoop --> APICall: 发送 API 请求
    
    APICall --> StreamResponse: 流式响应
    StreamResponse --> ParseBlock: 解析 content_block
    
    ParseBlock --> ToolUse: tool_use 块
    ParseBlock --> TextBlock: text 块
    
    ToolUse --> PermissionCheck: 权限检查
    PermissionCheck --> Execute: 允许执行
    PermissionCheck --> Deny: 拒绝执行
    Execute --> ToolResult: 返回结果
    Deny --> ToolResult: 返回错误
    
    ToolResult --> QueryLoop: 继续循环
    TextBlock --> YieldMessage: yield 消息
    YieldMessage --> QueryLoop: 继续循环
    
    QueryLoop --> EndTurn: 结束条件
    EndTurn --> CalculateCost: 计算费用
    CalculateCost --> [*]: 返回结果

5.3 流式响应处理

SSE (Server-Sent Events) 处理

Claude Code 使用 Anthropic Messages API 的流式响应模式,通过 SSE 处理实时数据。

graph TD
    subgraph "流式响应处理"
        API[API Request] --> STREAM[Stream Connection]
        STREAM --> EVENT1[event: message_start]
        EVENT1 --> EVENT2[event: content_block_start]
        EVENT2 --> EVENT3[event: content_block_delta]
        EVENT3 --> EVENT4[event: content_block_stop]
        EVENT4 --> EVENT5[event: message_delta]
        EVENT5 --> EVENT6[event: message_stop]
    end
    
    subgraph "事件处理"
        EVENT1 --> USAGE[初始化 Usage]
        EVENT2 --> BLOCK[开始 Content Block]
        EVENT3 --> ACCUM[累积内容]
        EVENT5 --> FINAL[最终 Usage]
    end

流事件类型定义

type StreamEvent = 
  | { type: 'message_start'; message: { usage: Usage } }
  | { type: 'content_block_start'; index: number; content_block: ContentBlock }
  | { type: 'content_block_delta'; index: number; delta: Delta }
  | { type: 'content_block_stop'; index: number }
  | { type: 'message_delta'; delta: { stop_reason: string }; usage: Usage }
  | { type: 'message_stop' }

Usage 统计追踪

// 在 QueryEngine 中追踪 usage
if (message.event.type === 'message_start') {
  currentMessageUsage = EMPTY_USAGE
  currentMessageUsage = updateUsage(currentMessageUsage, message.event.message.usage)
}

if (message.event.type === 'message_delta') {
  currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
  if (message.event.delta.stop_reason != null) {
    lastStopReason = message.event.delta.stop_reason
  }
}

if (message.event.type === 'message_stop') {
  this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage)
}

Token 计费追踪(cost-tracker.ts)

// cost-tracker.ts 核心函数
export function addToTotalSessionCost(
  cost: number,
  usage: Usage,
  model: string,
): number {
  const modelUsage = addToTotalModelUsage(cost, usage, model)
  addToTotalCostState(cost, modelUsage, model)

  // 更新计数器
  getCostCounter()?.add(cost, { model })
  getTokenCounter()?.add(usage.input_tokens, { model, type: 'input' })
  getTokenCounter()?.add(usage.output_tokens, { model, type: 'output' })
  getTokenCounter()?.add(usage.cache_read_input_tokens ?? 0, { model, type: 'cacheRead' })
  getTokenCounter()?.add(usage.cache_creation_input_tokens ?? 0, { model, type: 'cacheCreation' })

  return totalCost
}

// 模型使用统计
function addToTotalModelUsage(cost: number, usage: Usage, model: string): ModelUsage {
  const modelUsage = getUsageForModel(model) ?? {
    inputTokens: 0,
    outputTokens: 0,
    cacheReadInputTokens: 0,
    cacheCreationInputTokens: 0,
    webSearchRequests: 0,
    costUSD: 0,
    contextWindow: 0,
    maxOutputTokens: 0,
  }

  modelUsage.inputTokens += usage.input_tokens
  modelUsage.outputTokens += usage.output_tokens
  modelUsage.cacheReadInputTokens += usage.cache_read_input_tokens ?? 0
  modelUsage.cacheCreationInputTokens += usage.cache_creation_input_tokens ?? 0
  modelUsage.webSearchRequests += usage.server_tool_use?.web_search_requests ?? 0
  modelUsage.costUSD += cost
  
  return modelUsage
}

费用计算公式

总费用 = 输入 tokens 费用 + 输出 tokens 费用 + 缓存读取费用 + 缓存创建费用 + 网络搜索费用。缓存读取费用约为输入费用的 10%,缓存创建费用约为输入费用的 25%。

费用追踪流程图

flowchart LR
    subgraph "API 响应"
        RESP[API Response] --> USAGE[Usage 数据]
    end
    
    subgraph "费用计算"
        USAGE --> CALC[calculateUSDCost]
        CALC --> COST[费用金额]
    end
    
    subgraph "状态更新"
        COST --> STATE[addToTotalCostState]
        STATE --> COUNTER[Counter 更新]
        COUNTER --> CONFIG[保存到 Config]
    end
    
    subgraph "展示"
        STATE --> FORMAT[formatTotalCost]
        FORMAT --> DISPLAY[终端输出]
    end

5.4 query.ts 辅助函数

query.ts 是查询循环的核心实现,处理工具调用、上下文压缩、错误恢复等逻辑。

查询循环状态管理

type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined
}

主查询函数结构

export async function* query(params: QueryParams): AsyncGenerator<...> {
  let state: State = {
    messages: params.messages,
    toolUseContext: params.toolUseContext,
    maxOutputTokensOverride: params.maxOutputTokensOverride,
    autoCompactTracking: undefined,
    turnCount: 1,
    // ...
  }

  // 主循环
  while (true) {
    yield { type: 'stream_request_start' }
    
    // 1. 上下文处理
    let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)]
    messagesForQuery = await applyToolResultBudget(messagesForQuery, ...)
    
    // 2. Snip 处理
    if (feature('HISTORY_SNIP')) {
      const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
      messagesForQuery = snipResult.messages
    }
    
    // 3. Microcompact
    const microcompactResult = await deps.microcompact(messagesForQuery, ...)
    
    // 4. Autocompact
    const { compactionResult } = await deps.autocompact(messagesForQuery, ...)
    
    // 5. API 调用
    for await (const message of deps.callModel({ messages, systemPrompt, ... })) {
      // 处理流式消息
      if (message.type === 'assistant') {
        assistantMessages.push(message)
        // 检测 tool_use 块
        const toolUseBlocks = message.message.content.filter(c => c.type === 'tool_use')
        if (toolUseBlocks.length > 0) {
          needsFollowUp = true
        }
      }
    }
    
    // 6. 工具执行
    if (needsFollowUp) {
      const toolUpdates = streamingToolExecutor?.getRemainingResults() 
        ?? runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
      for await (const update of toolUpdates) {
        if (update.message) yield update.message
      }
    }
    
    // 7. 继续循环或结束
    if (!needsFollowUp) {
      return { reason: 'completed' }
    }
    
    state = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], ... }
  }
}

错误恢复机制

graph TD
    subgraph "错误类型"
        E1[Prompt Too Long 413]
        E2[Max Output Tokens]
        E3[Image Size Error]
        E4[API Rate Limit]
    end
    
    subgraph "恢复策略"
        R1[Reactive Compact]
        R2[Token Escalation]
        R3[Media Strip Retry]
        R4[Wait and Retry]
    end
    
    subgraph "结果"
        S1[成功恢复]
        S2[失败返回错误]
    end
    
    E1 --> R1 --> S1
    E1 --> R1 --> S2
    E2 --> R2 --> S1
    E3 --> R3 --> S1
    E4 --> R4 --> S1

错误恢复限制

max_output_tokens 恢复最多尝试 3 次;prompt_too_long 在 Reactive Compact 失败后直接返回错误。这些限制防止无限循环消耗资源。

上下文压缩触发条件

触发条件压缩方式说明
Token 超过阈值Autocompact自动调用摘要 API
Snip 标记Snip Compact移除标记的冗余内容
请求级别Microcompact缓存编辑优化
413 错误Reactive Compact紧急压缩恢复
flowchart TB
    subgraph "压缩检查顺序"
        CHECK[检查 Token 数量] --> SNIP{需要 Snip?}
        SNIP --> |"是"| SNIP_EXEC[执行 Snip]
        SNIP --> |"否"| MICRO{需要 Microcompact?}
        SNIP_EXEC --> MICRO
        MICRO --> |"是"| MICRO_EXEC[执行 Microcompact]
        MICRO --> |"否"| AUTO{需要 Autocompact?}
        MICRO_EXEC --> AUTO
        AUTO --> |"是"| AUTO_EXEC[执行 Autocompact]
        AUTO --> |"否"| API[发送 API 请求]
        AUTO_EXEC --> API
    end

下一章将深入分析 工具系统 的实现细节。