Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第四章:IPC通信机制

本章深入介绍 VSDB 的 IPC 通信实现,包括消息协议、请求匹配、流式传输和崩溃恢复。

4.1 IPC 概述

IPC(Inter-Process Communication)是双进程架构的核心,负责 Extension Host 和 Worker Process 之间的消息传递。

graph LR
    subgraph "Extension Host"
        A["IpcManager"]
        B["sendRequest()"]
        C["handleMessage()"]
    end
    
    subgraph "Worker Process"
        D["worker.ts"]
        E["handleMessage()"]
        F["send()"]
    end
    
    B -->|"JSON over IPC"| D
    D --> E
    E --> F
    F -->|"JSON over IPC"| A
    A --> C

Node.js child_process IPC

VSDB 使用 child_process.fork() 创建 Worker:

// fork 创建子进程,自动建立 IPC 通道
this.worker = fork(this.workerPath, [], {
  stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
});

IPC 特性:

  • 自动序列化process.send() 自动 JSON 序列化
  • 事件驱动process.on('message') 接收消息
  • 双向通信:主进程和子进程都可发送/接收

4.2 消息协议设计

请求类型

interface WorkerRequest {
  id: string;           // UUID,用于匹配响应
  type: RequestType;    // 请求类型
  connectionId: string; // 目标连接
  payload: RequestPayload;
}

type RequestType = 
  | 'connect'      // 建立连接
  | 'disconnect'   // 断开连接
  | 'query'        // 执行查询
  | 'streamQuery'  // 流式查询
  | 'schema'       // 查询 Schema
  | 'cancel'       // 取消查询
  | 'ping'         // 心跳检测
  | 'shutdown';    // 关闭 Worker

interface RequestPayload {
  config?: DbConnection;     // 连接配置
  sql?: string;              // SQL 语句
  params?: unknown[];        // 查询参数
  schemaType?: SchemaType;   // Schema 类型
  database?: string;         // 数据库名
  table?: string;            // 表名
}

响应类型

interface WorkerResponse {
  id: string;           // 匹配请求 ID
  type: ResponseType;   // 响应类型
  data?: QueryResult | SchemaResult;
  error?: WorkerError;
}

type ResponseType = 
  | 'result'       // 查询结果
  | 'streamEnd'    // 流式传输结束
  | 'error'        // 错误响应
  | 'pong';        // 心跳响应

interface WorkerError {
  code: string;
  message: string;
  errorClass?: ErrorClass;
  retryable?: boolean;
}

流式分块

interface StreamChunk {
  requestId: string;          // 匹配请求 ID
  chunkIndex: number;         // 分块索引
  rows: Record<string, unknown>[];  // 数据行
  totalRows?: number;         // 总行数(仅最后一块)
}

4.3 IpcManager 实现

核心结构

class IpcManager {
  private workerPath: string;
  private worker: ChildProcess | null = null;
  private pendingRequests = new Map<string, PendingRequest>();
  
  // 配置参数
  private restartOnCrash: boolean;
  private maxRestartAttempts: number;
  private requestTimeout: number;
  private heartbeatInterval: number;
  
  // 心跳监控
  private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
  private missedHeartbeats = 0;
  
  // 连接跟踪(崩溃恢复用)
  private activeConnections = new Map<string, DbConnection>();
}

PendingRequest 结构

interface PendingRequest {
  resolve: (response: WorkerResponse) => void;
  reject: (error: Error) => void;
  streamChunks: StreamChunk[];  // 流式分块累积
  isStream: boolean;            // 是否流式请求
  timer: ReturnType<typeof setTimeout>;  // 超时定时器
}

启动 Worker

start(): void {
  if (this.worker && !this.worker.killed) {
    return;  // 已运行
  }
  
  this.isShuttingDown = false;
  
  // 创建 Worker 进程
  this.worker = fork(this.workerPath, [], {
    stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
  });
  
  // 消息处理
  this.worker.on('message', (message: WorkerResponse | StreamChunk) => {
    this.handleMessage(message);
  });
  
  // 进程退出处理
  this.worker.on('exit', (code, signal) => {
    this.handleWorkerExit(code, signal);
  });
  
  // 启动心跳
  this.startHeartbeat();
}

发送请求

sendRequest(request: Omit<WorkerRequest, 'id'>): Promise<WorkerResponse> {
  return new Promise((resolve, reject) => {
    if (!this.worker || this.worker.killed) {
      reject(new Error('Worker process is not running'));
      return;
    }
    
    // 生成完整请求
    const fullRequest: WorkerRequest = {
      ...request,
      id: randomUUID(),
    };
    
    // 创建 PendingRequest
    const pending: PendingRequest = {
      resolve,
      reject,
      streamChunks: [],
      isStream: request.type === 'streamQuery',
      timer: setTimeout(() => {
        this.pendingRequests.delete(fullRequest.id);
        reject(new Error(`Request timed out`));
      }, this.requestTimeout),
    };
    
    this.pendingRequests.set(fullRequest.id, pending);
    
    // 跟踪活动连接
    if (request.type === 'connect' && request.payload.config) {
      this.activeConnections.set(request.connectionId, request.payload.config);
    }
    
    // 发送请求
    this.worker.send(fullRequest);
  });
}

处理响应

private handleMessage(message: WorkerResponse | StreamChunk): void {
  // 重置心跳计数
  this.missedHeartbeats = 0;
  
  // 心跳响应
  if (message.type === 'pong') {
    return;
  }
  
  // 流式分块
  if ('requestId' in message) {
    this.handleStreamChunk(message as StreamChunk);
    return;
  }
  
  // 普通响应
  const response = message as WorkerResponse;
  const pending = this.pendingRequests.get(response.id);
  
  if (!pending) {
    return;  // 无匹配的请求(可能已超时)
  }
  
  clearTimeout(pending.timer);
  
  // 流式结束
  if (pending.isStream && response.type === 'streamEnd') {
    this.assembleStreamResponse(pending, response);
  } else {
    this.pendingRequests.delete(response.id);
    pending.resolve(response);
  }
}

流式响应组装

private assembleStreamResponse(
  pending: PendingRequest, 
  response: WorkerResponse
): void {
  // 合并所有分块
  const assembledRows: Record<string, unknown>[] = [];
  let columns: string[] = [];
  let totalRows = 0;
  
  for (const chunk of pending.streamChunks) {
    assembledRows.push(...chunk.rows);
    if (chunk.totalRows !== undefined) {
      totalRows = chunk.totalRows;
    }
  }
  
  // 提取列名
  if (assembledRows.length > 0) {
    columns = Object.keys(assembledRows[0]);
  }
  
  // 构造 QueryResult
  const queryResult: QueryResult = {
    columns,
    rows: assembledRows,
    rowCount: assembledRows.length,
    executionTime: 0,
    ...(totalRows > 0 ? { affectedRows: totalRows } : {}),
  };
  
  this.pendingRequests.delete(response.id);
  pending.resolve({
    id: response.id,
    type: 'result',
    data: queryResult,
  });
}

4.4 流式传输详解

为什么需要流式传输

graph TD
    A["查询返回 100 万行"] --> B{"传输方式"}
    
    B -->|普通查询| C["单次 IPC 消息"]
    C --> D["消息过大"]
    D --> E["IPC 阻塞"]
    E --> F["UI 卡顿"]
    
    B -->|流式查询| G["分块传输"]
    G --> H["每块 1000 行"]
    H --> I["渐进渲染"]
    I --> J["UI 流畅"]

Worker 端流式实现

async function handleStreamQuery(request: WorkerRequest): Promise<void> {
  const entry = connections.get(request.connectionId);
  const { driver } = entry;
  const sql = request.payload.sql;
  
  activeStreams.add(request.id);  // 注册活动流
  
  queryQueue.enqueue(request, async () => {
    try {
      const stream = driver.streamQuery(sql);
      const chunkSize = 1000;
      let chunkIndex = 0;
      
      for await (const chunk of stream) {
        // 检查是否被取消
        if (!activeStreams.has(request.id)) {
          return { id: request.id, type: 'streamEnd' };
        }
        
        // 发送分块
        send({
          requestId: request.id,
          chunkIndex,
          rows: chunk.rows,
          totalRows: chunk.totalRows,
        });
        
        chunkIndex++;
      }
      
      // 发送结束标记
      return { id: request.id, type: 'streamEnd' };
    } finally {
      activeStreams.delete(request.id);
    }
  });
}

Driver 端流式实现

// mysql.ts
async *streamQuery(
  sql: string, 
  params?: any[], 
  chunkSize = 1000
): AsyncGenerator<StreamChunkData> {
  const streamConn = await mysql.createConnection(config);
  
  try {
    const [result] = await streamConn.execute(sql, params);
    
    if (!Array.isArray(result)) {
      yield { chunkIndex: 0, rows: [], totalRows: 0 };
      return;
    }
    
    const allRows = result;
    const totalRows = allRows.length;
    
    // 分块 yield
    for (let offset = 0; offset < allRows.length; offset += chunkSize) {
      const chunk = allRows.slice(offset, offset + chunkSize);
      const isLast = offset + chunkSize >= allRows.length;
      
      yield {
        chunkIndex: Math.floor(offset / chunkSize),
        rows: chunk,
        ...(isLast ? { totalRows } : {}),
      };
    }
  } finally {
    await streamConn.end();
  }
}

分块超时控制

// IpcManager 设置分块超时
private setupStreamChunkTimeout(requestId: string): void {
  const timer = setTimeout(() => {
    const pending = this.pendingRequests.get(requestId);
    if (pending) {
      this.pendingRequests.delete(requestId);
      clearTimeout(pending.timer);
      pending.reject(new Error('STREAM_TIMEOUT'));
    }
  }, this.streamChunkTimeout);
  
  this.streamChunkTimers.set(requestId, timer);
}

// 收到分块时重置
private resetStreamChunkTimer(requestId: string): void {
  // 清除旧定时器
  const existing = this.streamChunkTimers.get(requestId);
  if (existing) clearTimeout(existing);
  
  // 设置新定时器
  this.setupStreamChunkTimeout(requestId);
}

4.5 心跳机制

心跳设计

sequenceDiagram
    participant Host as IpcManager
    participant Worker as Worker Process
    
    loop 每 15 秒
        Host->>Worker: ping
        Worker-->>Host: pong
        Host->>Host: missedHeartbeats = 0
    end
    
    Note over Host: Worker 未响应
    Host->>Host: missedHeartbeats++
    Host->>Host: missedHeartbeats++
    Host->>Worker: SIGKILL (强制终止)
    Host->>Host: handleWorkerExit()

实现

private startHeartbeat(): void {
  this.stopHeartbeat();
  this.missedHeartbeats = 0;
  
  this.heartbeatTimer = setInterval(() => {
    if (!this.worker || this.worker.killed) {
      this.stopHeartbeat();
      return;
    }
    
    // 发送 ping
    try {
      this.worker.send({ type: 'ping', id: 'heartbeat' });
    } catch {
      return;
    }
    
    // 增加计数
    this.missedHeartbeats++;
    if (this.missedHeartbeats >= this.maxMissedHeartbeats) {
      console.error('Worker heartbeat timeout');
      this.handleWorkerHang();
    }
  }, this.heartbeatInterval);
}

private handleWorkerHang(): void {
  if (this.worker && !this.worker.killed) {
    this.worker.kill('SIGKILL');  // 强制终止
  }
}

Worker 端心跳响应

// worker.ts
case 'ping':
  send({ id: 'heartbeat', type: 'pong' });
  break;

4.6 崩溃恢复

崩溃检测

worker.on('exit', (code, signal) => {
  console.error(`Worker exited: code=${code}, signal=${signal}`);
  this.stopHeartbeat();
  
  // 收集丢失的连接
  const lostConnectionIds = Array.from(this.activeConnections.keys());
  
  // 拒绝所有待处理请求
  this.rejectAllPending(`Worker exited unexpectedly`);
  
  this.worker = null;
  
  // 判断是否需要重启
  if (!this.isShuttingDown && this.restartOnCrash) {
    this.handleCrashRecovery(lostConnectionIds);
  }
});

重启策略

private handleCrashRecovery(lostConnectionIds: string[]): void {
  if (this.restartAttempts < this.maxRestartAttempts) {
    this.crashed = true;
    this.restartAttempts++;
    
    // 指数退避
    const delay = Math.min(
      500 * Math.pow(2, this.restartAttempts - 1), 
      5000
    );
    
    console.log(`Auto-restarting worker (attempt ${this.restartAttempts})`);
    
    // 通知回调
    this.callbacks.onWorkerCrash?.(this.restartAttempts);
    this.callbacks.onConnectionsLost?.(lostConnectionIds);
    
    setTimeout(() => {
      this.start();
      this.reconnectActiveConnections();
      this.callbacks.onWorkerRestarted?.();
    }, delay);
  }
}

连接恢复

private reconnectActiveConnections(): void {
  for (const [connectionId, config] of this.activeConnections) {
    console.log(`Reconnecting: ${connectionId}`);
    
    this.sendRequest({
      type: 'connect',
      connectionId,
      payload: { config },
    }).catch((err) => {
      console.error(`Failed to reconnect ${connectionId}: ${err.message}`);
    });
  }
}

重启时序

sequenceDiagram
    participant User as 用户
    participant IPC as IpcManager
    participant Worker1 as Worker (旧)
    participant Worker2 as Worker (新)
    
    Worker1-->>IPC: exit (crash)
    IPC->>User: onWorkerCrash callback
    IPC->>IPC: wait (指数退避)
    IPC->>Worker2: fork (新进程)
    Worker2-->>IPC: ready
    IPC->>Worker2: reconnect all activeConnections
    Worker2-->>IPC: connect results
    IPC->>User: onWorkerRestarted callback

4.7 优雅关闭

关闭流程

async shutdown(): Promise<void> {
  this.isShuttingDown = true;
  this.stopHeartbeat();
  
  // 等待待处理请求完成
  if (this.pendingRequests.size > 0 && this.worker) {
    // 发送关闭信号
    try {
      this.worker.send({ type: 'shutdown' });
    } catch {}
    
    // 等待请求排空
    const drainStart = Date.now();
    while (this.pendingRequests.size > 0 && 
           (Date.now() - drainStart) < this.gracefulShutdownTimeout) {
      await new Promise(resolve => setTimeout(resolve, 200));
    }
  }
  
  // 拒绝剩余请求
  this.rejectAllPending('IPCManager shutting down');
  
  // 终止进程
  if (this.worker && !this.worker.killed) {
    this.worker.kill('SIGTERM');
    
    // 等待退出或强制终止
    await new Promise(resolve => {
      const timeout = setTimeout(() => {
        if (!this.worker.killed) {
          this.worker.kill('SIGKILL');
        }
        resolve();
      }, this.gracefulShutdownTimeout);
      
      this.worker.on('exit', () => {
        clearTimeout(timeout);
        resolve();
      });
    });
  }
  
  this.worker = null;
  this.activeConnections.clear();
}

Worker 端关闭

// worker.ts
async function shutdown(): Promise<void> {
  stopMemoryMonitoring();
  queryQueue.dispose();
  
  // 断开所有连接
  for (const [id, entry] of connections) {
    try {
      await entry.driver.disconnect();
    } catch {}
  }
  
  connections.clear();
  activeStreams.clear();
}

process.on('disconnect', () => {
  shutdown().then(() => process.exit(0));
});

process.on('SIGTERM', () => {
  shutdown().then(() => process.exit(0));
});

4.8 Worker 就绪机制

就绪信号

// worker.ts - 启动时发送就绪信号
process.send?.({ type: 'ready' });

等待就绪

// IpcManager.ts
private workerReady: boolean = false;
private workerReadyResolve?: () => void;
private workerReadyPromise?: Promise<void>;

start(): void {
  this.workerReady = false;
  this.workerReadyPromise = new Promise<void>((resolve) => {
    this.workerReadyResolve = resolve;
  });
  
  this.worker = fork(this.workerPath, ...);
  
  this.worker.on('message', (message) => {
    if ((message as any).type === 'ready') {
      this.workerReady = true;
      this.workerReadyResolve?.();
      return;
    }
    this.handleMessage(message);
  });
}

async waitReady(timeoutMs = 5000): Promise<boolean> {
  if (this.workerReady) return true;
  
  const timeout = new Promise<false>((resolve) => {
    setTimeout(() => resolve(false), timeoutMs);
  });
  
  return Promise.race([
    this.workerReadyPromise!.then(() => true),
    timeout,
  ]);
}

4.9 配置参数

IpcManagerOptions

interface IpcManagerOptions {
  workerScriptPath: string;         // Worker 脚本路径
  
  restartOnCrash?: boolean;         // 崩溃时重启(默认 true)
  maxRestartAttempts?: number;      // 最大重启次数(默认 3)
  requestTimeout?: number;          // 请求超时(默认 60000ms)
  heartbeatInterval?: number;       // 心跳间隔(默认 15000ms)
  heartbeatTimeout?: number;        // 心跳超时(默认 30000ms)
  streamChunkTimeout?: number;      // 分块超时(默认 30000ms)
  gracefulShutdownTimeout?: number; // 关闭超时(默认 10000ms)
  
  callbacks?: IpcManagerCallbacks;  // 事件回调
}

默认值

const DEFAULT_REQUEST_TIMEOUT = 60000;      // 60 秒
const DEFAULT_MAX_RESTART_ATTEMPTS = 3;
const DEFAULT_HEARTBEAT_INTERVAL = 15000;   // 15 秒
const DEFAULT_HEARTBEAT_TIMEOUT = 30000;    // 30 秒
const DEFAULT_STREAM_CHUNK_TIMEOUT = 30000; // 30 秒
const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = 10000; // 10 秒

4.10 小结

本章介绍了 VSDB 的 IPC 通信机制:

特性实现
消息协议WorkerRequest / WorkerResponse / StreamChunk
请求匹配UUID + pendingRequests Map
流式传输分块发送 + 组装响应
心跳机制ping/pong + missedHeartbeats 计数
崩溃恢复重启 + 重连 activeConnections
优雅关闭等待排空 + SIGTERM/SIGKILL
Worker 就绪ready 信号 + waitReady()

下一章将深入数据库驱动的实现细节。