第四章:IPC通信机制
本章深入介绍 VSDB 的 IPC 通信实现,包括消息协议、请求匹配、流式传输和崩溃恢复。
4.1 IPC 概述
IPC(Inter-Process Communication)是双进程架构的核心,负责 Extension Host 和 Worker Process 之间的消息传递。
graph LR
subgraph "Extension Host"
A["IpcManager"]
B["sendRequest()"]
C["handleMessage()"]
end
subgraph "Worker Process"
D["worker.ts"]
E["handleMessage()"]
F["send()"]
end
B -->|"JSON over IPC"| D
D --> E
E --> F
F -->|"JSON over IPC"| A
A --> C
Node.js child_process IPC
VSDB 使用 child_process.fork() 创建 Worker:
// fork 创建子进程,自动建立 IPC 通道
this.worker = fork(this.workerPath, [], {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
});
IPC 特性:
- 自动序列化:
process.send()自动 JSON 序列化 - 事件驱动:
process.on('message')接收消息 - 双向通信:主进程和子进程都可发送/接收
4.2 消息协议设计
请求类型
interface WorkerRequest {
id: string; // UUID,用于匹配响应
type: RequestType; // 请求类型
connectionId: string; // 目标连接
payload: RequestPayload;
}
type RequestType =
| 'connect' // 建立连接
| 'disconnect' // 断开连接
| 'query' // 执行查询
| 'streamQuery' // 流式查询
| 'schema' // 查询 Schema
| 'cancel' // 取消查询
| 'ping' // 心跳检测
| 'shutdown'; // 关闭 Worker
interface RequestPayload {
config?: DbConnection; // 连接配置
sql?: string; // SQL 语句
params?: unknown[]; // 查询参数
schemaType?: SchemaType; // Schema 类型
database?: string; // 数据库名
table?: string; // 表名
}
响应类型
interface WorkerResponse {
id: string; // 匹配请求 ID
type: ResponseType; // 响应类型
data?: QueryResult | SchemaResult;
error?: WorkerError;
}
type ResponseType =
| 'result' // 查询结果
| 'streamEnd' // 流式传输结束
| 'error' // 错误响应
| 'pong'; // 心跳响应
interface WorkerError {
code: string;
message: string;
errorClass?: ErrorClass;
retryable?: boolean;
}
流式分块
interface StreamChunk {
requestId: string; // 匹配请求 ID
chunkIndex: number; // 分块索引
rows: Record<string, unknown>[]; // 数据行
totalRows?: number; // 总行数(仅最后一块)
}
4.3 IpcManager 实现
核心结构
class IpcManager {
private workerPath: string;
private worker: ChildProcess | null = null;
private pendingRequests = new Map<string, PendingRequest>();
// 配置参数
private restartOnCrash: boolean;
private maxRestartAttempts: number;
private requestTimeout: number;
private heartbeatInterval: number;
// 心跳监控
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private missedHeartbeats = 0;
// 连接跟踪(崩溃恢复用)
private activeConnections = new Map<string, DbConnection>();
}
PendingRequest 结构
interface PendingRequest {
resolve: (response: WorkerResponse) => void;
reject: (error: Error) => void;
streamChunks: StreamChunk[]; // 流式分块累积
isStream: boolean; // 是否流式请求
timer: ReturnType<typeof setTimeout>; // 超时定时器
}
启动 Worker
start(): void {
if (this.worker && !this.worker.killed) {
return; // 已运行
}
this.isShuttingDown = false;
// 创建 Worker 进程
this.worker = fork(this.workerPath, [], {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
});
// 消息处理
this.worker.on('message', (message: WorkerResponse | StreamChunk) => {
this.handleMessage(message);
});
// 进程退出处理
this.worker.on('exit', (code, signal) => {
this.handleWorkerExit(code, signal);
});
// 启动心跳
this.startHeartbeat();
}
发送请求
sendRequest(request: Omit<WorkerRequest, 'id'>): Promise<WorkerResponse> {
return new Promise((resolve, reject) => {
if (!this.worker || this.worker.killed) {
reject(new Error('Worker process is not running'));
return;
}
// 生成完整请求
const fullRequest: WorkerRequest = {
...request,
id: randomUUID(),
};
// 创建 PendingRequest
const pending: PendingRequest = {
resolve,
reject,
streamChunks: [],
isStream: request.type === 'streamQuery',
timer: setTimeout(() => {
this.pendingRequests.delete(fullRequest.id);
reject(new Error(`Request timed out`));
}, this.requestTimeout),
};
this.pendingRequests.set(fullRequest.id, pending);
// 跟踪活动连接
if (request.type === 'connect' && request.payload.config) {
this.activeConnections.set(request.connectionId, request.payload.config);
}
// 发送请求
this.worker.send(fullRequest);
});
}
处理响应
private handleMessage(message: WorkerResponse | StreamChunk): void {
// 重置心跳计数
this.missedHeartbeats = 0;
// 心跳响应
if (message.type === 'pong') {
return;
}
// 流式分块
if ('requestId' in message) {
this.handleStreamChunk(message as StreamChunk);
return;
}
// 普通响应
const response = message as WorkerResponse;
const pending = this.pendingRequests.get(response.id);
if (!pending) {
return; // 无匹配的请求(可能已超时)
}
clearTimeout(pending.timer);
// 流式结束
if (pending.isStream && response.type === 'streamEnd') {
this.assembleStreamResponse(pending, response);
} else {
this.pendingRequests.delete(response.id);
pending.resolve(response);
}
}
流式响应组装
private assembleStreamResponse(
pending: PendingRequest,
response: WorkerResponse
): void {
// 合并所有分块
const assembledRows: Record<string, unknown>[] = [];
let columns: string[] = [];
let totalRows = 0;
for (const chunk of pending.streamChunks) {
assembledRows.push(...chunk.rows);
if (chunk.totalRows !== undefined) {
totalRows = chunk.totalRows;
}
}
// 提取列名
if (assembledRows.length > 0) {
columns = Object.keys(assembledRows[0]);
}
// 构造 QueryResult
const queryResult: QueryResult = {
columns,
rows: assembledRows,
rowCount: assembledRows.length,
executionTime: 0,
...(totalRows > 0 ? { affectedRows: totalRows } : {}),
};
this.pendingRequests.delete(response.id);
pending.resolve({
id: response.id,
type: 'result',
data: queryResult,
});
}
4.4 流式传输详解
为什么需要流式传输
graph TD
A["查询返回 100 万行"] --> B{"传输方式"}
B -->|普通查询| C["单次 IPC 消息"]
C --> D["消息过大"]
D --> E["IPC 阻塞"]
E --> F["UI 卡顿"]
B -->|流式查询| G["分块传输"]
G --> H["每块 1000 行"]
H --> I["渐进渲染"]
I --> J["UI 流畅"]
Worker 端流式实现
async function handleStreamQuery(request: WorkerRequest): Promise<void> {
const entry = connections.get(request.connectionId);
const { driver } = entry;
const sql = request.payload.sql;
activeStreams.add(request.id); // 注册活动流
queryQueue.enqueue(request, async () => {
try {
const stream = driver.streamQuery(sql);
const chunkSize = 1000;
let chunkIndex = 0;
for await (const chunk of stream) {
// 检查是否被取消
if (!activeStreams.has(request.id)) {
return { id: request.id, type: 'streamEnd' };
}
// 发送分块
send({
requestId: request.id,
chunkIndex,
rows: chunk.rows,
totalRows: chunk.totalRows,
});
chunkIndex++;
}
// 发送结束标记
return { id: request.id, type: 'streamEnd' };
} finally {
activeStreams.delete(request.id);
}
});
}
Driver 端流式实现
// mysql.ts
async *streamQuery(
sql: string,
params?: any[],
chunkSize = 1000
): AsyncGenerator<StreamChunkData> {
const streamConn = await mysql.createConnection(config);
try {
const [result] = await streamConn.execute(sql, params);
if (!Array.isArray(result)) {
yield { chunkIndex: 0, rows: [], totalRows: 0 };
return;
}
const allRows = result;
const totalRows = allRows.length;
// 分块 yield
for (let offset = 0; offset < allRows.length; offset += chunkSize) {
const chunk = allRows.slice(offset, offset + chunkSize);
const isLast = offset + chunkSize >= allRows.length;
yield {
chunkIndex: Math.floor(offset / chunkSize),
rows: chunk,
...(isLast ? { totalRows } : {}),
};
}
} finally {
await streamConn.end();
}
}
分块超时控制
// IpcManager 设置分块超时
private setupStreamChunkTimeout(requestId: string): void {
const timer = setTimeout(() => {
const pending = this.pendingRequests.get(requestId);
if (pending) {
this.pendingRequests.delete(requestId);
clearTimeout(pending.timer);
pending.reject(new Error('STREAM_TIMEOUT'));
}
}, this.streamChunkTimeout);
this.streamChunkTimers.set(requestId, timer);
}
// 收到分块时重置
private resetStreamChunkTimer(requestId: string): void {
// 清除旧定时器
const existing = this.streamChunkTimers.get(requestId);
if (existing) clearTimeout(existing);
// 设置新定时器
this.setupStreamChunkTimeout(requestId);
}
4.5 心跳机制
心跳设计
sequenceDiagram
participant Host as IpcManager
participant Worker as Worker Process
loop 每 15 秒
Host->>Worker: ping
Worker-->>Host: pong
Host->>Host: missedHeartbeats = 0
end
Note over Host: Worker 未响应
Host->>Host: missedHeartbeats++
Host->>Host: missedHeartbeats++
Host->>Worker: SIGKILL (强制终止)
Host->>Host: handleWorkerExit()
实现
private startHeartbeat(): void {
this.stopHeartbeat();
this.missedHeartbeats = 0;
this.heartbeatTimer = setInterval(() => {
if (!this.worker || this.worker.killed) {
this.stopHeartbeat();
return;
}
// 发送 ping
try {
this.worker.send({ type: 'ping', id: 'heartbeat' });
} catch {
return;
}
// 增加计数
this.missedHeartbeats++;
if (this.missedHeartbeats >= this.maxMissedHeartbeats) {
console.error('Worker heartbeat timeout');
this.handleWorkerHang();
}
}, this.heartbeatInterval);
}
private handleWorkerHang(): void {
if (this.worker && !this.worker.killed) {
this.worker.kill('SIGKILL'); // 强制终止
}
}
Worker 端心跳响应
// worker.ts
case 'ping':
send({ id: 'heartbeat', type: 'pong' });
break;
4.6 崩溃恢复
崩溃检测
worker.on('exit', (code, signal) => {
console.error(`Worker exited: code=${code}, signal=${signal}`);
this.stopHeartbeat();
// 收集丢失的连接
const lostConnectionIds = Array.from(this.activeConnections.keys());
// 拒绝所有待处理请求
this.rejectAllPending(`Worker exited unexpectedly`);
this.worker = null;
// 判断是否需要重启
if (!this.isShuttingDown && this.restartOnCrash) {
this.handleCrashRecovery(lostConnectionIds);
}
});
重启策略
private handleCrashRecovery(lostConnectionIds: string[]): void {
if (this.restartAttempts < this.maxRestartAttempts) {
this.crashed = true;
this.restartAttempts++;
// 指数退避
const delay = Math.min(
500 * Math.pow(2, this.restartAttempts - 1),
5000
);
console.log(`Auto-restarting worker (attempt ${this.restartAttempts})`);
// 通知回调
this.callbacks.onWorkerCrash?.(this.restartAttempts);
this.callbacks.onConnectionsLost?.(lostConnectionIds);
setTimeout(() => {
this.start();
this.reconnectActiveConnections();
this.callbacks.onWorkerRestarted?.();
}, delay);
}
}
连接恢复
private reconnectActiveConnections(): void {
for (const [connectionId, config] of this.activeConnections) {
console.log(`Reconnecting: ${connectionId}`);
this.sendRequest({
type: 'connect',
connectionId,
payload: { config },
}).catch((err) => {
console.error(`Failed to reconnect ${connectionId}: ${err.message}`);
});
}
}
重启时序
sequenceDiagram
participant User as 用户
participant IPC as IpcManager
participant Worker1 as Worker (旧)
participant Worker2 as Worker (新)
Worker1-->>IPC: exit (crash)
IPC->>User: onWorkerCrash callback
IPC->>IPC: wait (指数退避)
IPC->>Worker2: fork (新进程)
Worker2-->>IPC: ready
IPC->>Worker2: reconnect all activeConnections
Worker2-->>IPC: connect results
IPC->>User: onWorkerRestarted callback
4.7 优雅关闭
关闭流程
async shutdown(): Promise<void> {
this.isShuttingDown = true;
this.stopHeartbeat();
// 等待待处理请求完成
if (this.pendingRequests.size > 0 && this.worker) {
// 发送关闭信号
try {
this.worker.send({ type: 'shutdown' });
} catch {}
// 等待请求排空
const drainStart = Date.now();
while (this.pendingRequests.size > 0 &&
(Date.now() - drainStart) < this.gracefulShutdownTimeout) {
await new Promise(resolve => setTimeout(resolve, 200));
}
}
// 拒绝剩余请求
this.rejectAllPending('IPCManager shutting down');
// 终止进程
if (this.worker && !this.worker.killed) {
this.worker.kill('SIGTERM');
// 等待退出或强制终止
await new Promise(resolve => {
const timeout = setTimeout(() => {
if (!this.worker.killed) {
this.worker.kill('SIGKILL');
}
resolve();
}, this.gracefulShutdownTimeout);
this.worker.on('exit', () => {
clearTimeout(timeout);
resolve();
});
});
}
this.worker = null;
this.activeConnections.clear();
}
Worker 端关闭
// worker.ts
async function shutdown(): Promise<void> {
stopMemoryMonitoring();
queryQueue.dispose();
// 断开所有连接
for (const [id, entry] of connections) {
try {
await entry.driver.disconnect();
} catch {}
}
connections.clear();
activeStreams.clear();
}
process.on('disconnect', () => {
shutdown().then(() => process.exit(0));
});
process.on('SIGTERM', () => {
shutdown().then(() => process.exit(0));
});
4.8 Worker 就绪机制
就绪信号
// worker.ts - 启动时发送就绪信号
process.send?.({ type: 'ready' });
等待就绪
// IpcManager.ts
private workerReady: boolean = false;
private workerReadyResolve?: () => void;
private workerReadyPromise?: Promise<void>;
start(): void {
this.workerReady = false;
this.workerReadyPromise = new Promise<void>((resolve) => {
this.workerReadyResolve = resolve;
});
this.worker = fork(this.workerPath, ...);
this.worker.on('message', (message) => {
if ((message as any).type === 'ready') {
this.workerReady = true;
this.workerReadyResolve?.();
return;
}
this.handleMessage(message);
});
}
async waitReady(timeoutMs = 5000): Promise<boolean> {
if (this.workerReady) return true;
const timeout = new Promise<false>((resolve) => {
setTimeout(() => resolve(false), timeoutMs);
});
return Promise.race([
this.workerReadyPromise!.then(() => true),
timeout,
]);
}
4.9 配置参数
IpcManagerOptions
interface IpcManagerOptions {
workerScriptPath: string; // Worker 脚本路径
restartOnCrash?: boolean; // 崩溃时重启(默认 true)
maxRestartAttempts?: number; // 最大重启次数(默认 3)
requestTimeout?: number; // 请求超时(默认 60000ms)
heartbeatInterval?: number; // 心跳间隔(默认 15000ms)
heartbeatTimeout?: number; // 心跳超时(默认 30000ms)
streamChunkTimeout?: number; // 分块超时(默认 30000ms)
gracefulShutdownTimeout?: number; // 关闭超时(默认 10000ms)
callbacks?: IpcManagerCallbacks; // 事件回调
}
默认值
const DEFAULT_REQUEST_TIMEOUT = 60000; // 60 秒
const DEFAULT_MAX_RESTART_ATTEMPTS = 3;
const DEFAULT_HEARTBEAT_INTERVAL = 15000; // 15 秒
const DEFAULT_HEARTBEAT_TIMEOUT = 30000; // 30 秒
const DEFAULT_STREAM_CHUNK_TIMEOUT = 30000; // 30 秒
const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = 10000; // 10 秒
4.10 小结
本章介绍了 VSDB 的 IPC 通信机制:
| 特性 | 实现 |
|---|---|
| 消息协议 | WorkerRequest / WorkerResponse / StreamChunk |
| 请求匹配 | UUID + pendingRequests Map |
| 流式传输 | 分块发送 + 组装响应 |
| 心跳机制 | ping/pong + missedHeartbeats 计数 |
| 崩溃恢复 | 重启 + 重连 activeConnections |
| 优雅关闭 | 等待排空 + SIGTERM/SIGKILL |
| Worker 就绪 | ready 信号 + waitReady() |
下一章将深入数据库驱动的实现细节。