Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第五章:数据库驱动实现

本章介绍 VSDB 的数据库驱动抽象层和 MySQL/PostgreSQL 驱动的具体实现。

5.1 驱动抽象层设计

驱动接口

VSDB 没有显式定义 Driver 接口,但所有驱动遵循相同的方法签名:

// 隐式驱动接口
interface DatabaseDriver {
  // 连接管理
  connect(config: DbConnection): Promise<void>;
  disconnect(): Promise<void>;
  isConnected(): boolean;
  
  // 查询执行
  query(sql: string, params?: unknown[]): Promise<QueryResult>;
  streamQuery(sql: string, params?: unknown[], chunkSize?: number): AsyncGenerator<StreamChunkData>;
  
  // 访问底层连接(用于 Schema 查询)
  getConnection(): Connection | Pool | null;
  getPool(): Pool | null;  // PostgreSQL
}

驱动选择策略

// worker.ts
function createDriver(config: DbConnection): DatabaseDriver {
  return config.type === 'mysql' 
    ? new MySqlDriver() 
    : new PostgreSqlDriver();
}
graph TD
    A["DbConnection.type"] --> B{"数据库类型"}
    B -->|"mysql"| C["MySqlDriver"]
    B -->|"postgresql"| D["PostgreSqlDriver"]
    
    C --> E["mysql2/promise"]
    D --> F["pg"]
    
    E --> G["MySQL 连接"]
    F --> H["PostgreSQL Pool"]

5.2 MySQL 驱动实现

MySqlDriver 类

import mysql from 'mysql2/promise';
import type { DbConnection, QueryResult } from '../../shared/types';

export class MySqlDriver {
  private connection: mysql.Connection | null = null;
  
  // 连接配置
  async connect(config: DbConnection): Promise<void> {
    if (this.connection) {
      await this.disconnect();  // 先断开旧连接
    }
    
    this.connection = await mysql.createConnection({
      host: config.host,
      port: config.port,
      user: config.username,
      password: config.password,
      database: config.database,
      connectTimeout: 10000,     // 10 秒连接超时
      ...config.options,         // 扩展选项(SSL、charset 等)
    });
  }
  
  // 断开连接
  async disconnect(): Promise<void> {
    if (this.connection) {
      try {
        await this.connection.end();
      } catch {
        // 连接可能已关闭,忽略错误
      }
      this.connection = null;
    }
  }
  
  // 检查连接状态
  isConnected(): boolean {
    return this.connection !== null;
  }
  
  // 获取底层连接
  getConnection(): mysql.Connection | null {
    return this.connection;
  }
}

查询执行

async query(sql: string, params?: any[]): Promise<QueryResult> {
  if (!this.connection) {
    throw new Error('MySQL: not connected');
  }
  
  const startTime = Date.now();
  const [result] = await this.connection.execute(sql, params);
  const executionTime = Date.now() - startTime;
  
  // 区分 SELECT 和 INSERT/UPDATE/DELETE
  if (result && typeof result === 'object' && 'affectedRows' in result) {
    // INSERT/UPDATE/DELETE 返回 ResultSetHeader
    return {
      columns: [],
      rows: [],
      rowCount: (result as any).affectedRows,
      affectedRows: (result as any).affectedRows,
      executionTime,
    };
  }
  
  // SELECT 返回 RowDataPacket[]
  const rows = Array.isArray(result) ? result as Record<string, unknown>[] : [];
  const columns = rows.length > 0 ? Object.keys(rows[0]) : [];
  
  return {
    columns,
    rows,
    rowCount: rows.length,
    executionTime,
  };
}

流式查询

async *streamQuery(
  sql: string, 
  params?: any[], 
  chunkSize = 1000
): AsyncGenerator<{ chunkIndex: number; rows: Record<string, unknown>[]; totalRows?: number }> {
  if (!this.connection) {
    throw new Error('MySQL: not connected');
  }
  
  // 创建独立连接避免阻塞主连接
  const streamConn = await mysql.createConnection(
    (this.connection as any).connection?.config || {}
  );
  
  try {
    const [result] = await streamConn.execute(sql, params);
    
    if (!Array.isArray(result)) {
      yield { chunkIndex: 0, rows: [], totalRows: 0 };
      return;
    }
    
    const allRows = result;
    const totalRows = allRows.length;
    let chunkIndex = 0;
    
    // 分块 yield
    for (let offset = 0; offset < allRows.length; offset += chunkSize) {
      const chunk = allRows.slice(offset, offset + chunkSize);
      const isLast = offset + chunkSize >= allRows.length;
      
      yield {
        chunkIndex,
        rows: chunk,
        ...(isLast ? { totalRows } : {}),
      };
      chunkIndex++;
    }
  } finally {
    await streamConn.end();  // 确保关闭流连接
  }
}

mysql2 特性

VSDB 选择 mysql2 的原因:

特性mysql2mysql (旧版)
Promise 支持✓(原生)✗(需包装)
Prepare Statement
流式查询
性能更快较慢
维护状态活跃已停止

5.3 PostgreSQL 驱动实现

PostgreSqlDriver 类

import { Pool } from 'pg';
import type { DbConnection, QueryResult } from '../../shared/types';

export class PostgreSqlDriver {
  private pool: Pool | null = null;
  
  async connect(config: DbConnection): Promise<void> {
    if (this.pool) {
      await this.disconnect();
    }
    
    // PostgreSQL 使用连接池
    this.pool = new Pool({
      host: config.host,
      port: config.port,
      user: config.username,
      password: config.password,
      database: config.database || 'postgres',
      max: 5,                   // 最大连接数
      connectionTimeoutMillis: 10000,
      ...config.options,
    });
    
    // 测试连接
    try {
      await this.pool.query('SELECT 1');
    } catch (err) {
      this.pool.end();
      this.pool = null;
      throw err;
    }
  }
  
  async disconnect(): Promise<void> {
    if (this.pool) {
      await this.pool.end();
      this.pool = null;
    }
  }
  
  isConnected(): boolean {
    return this.pool !== null;
  }
  
  getPool(): Pool | null {
    return this.pool;
  }
}

查询执行

async query(sql: string, params?: any[]): Promise<QueryResult> {
  if (!this.pool) {
    throw new Error('PostgreSQL: not connected');
  }
  
  const startTime = Date.now();
  const result = await this.pool.query(sql, params);
  const executionTime = Date.now() - startTime;
  
  // PostgreSQL 结果格式不同于 MySQL
  if (result.command !== 'SELECT') {
    // INSERT/UPDATE/DELETE
    return {
      columns: [],
      rows: [],
      rowCount: result.rowCount || 0,
      affectedRows: result.rowCount || 0,
      executionTime,
    };
  }
  
  // SELECT
  const rows = result.rows as Record<string, unknown>[];
  const columns = result.fields?.map(f => f.name) || 
                  (rows.length > 0 ? Object.keys(rows[0]) : []);
  
  return {
    columns,
    rows,
    rowCount: rows.length,
    executionTime,
  };
}

流式查询

async *streamQuery(
  sql: string, 
  params?: any[], 
  chunkSize = 1000
): AsyncGenerator<StreamChunkData> {
  if (!this.pool) {
    throw new Error('PostgreSQL: not connected');
  }
  
  // 使用单一连接执行流式查询
  const client = await this.pool.connect();
  
  try {
    // 使用 cursor 实现真正的流式
    const cursor = client.query(new Cursor(sql, params));
    let chunkIndex = 0;
    
    while (true) {
      const rows = await cursor.read(chunkSize);
      if (rows.length === 0) break;
      
      yield {
        chunkIndex,
        rows: rows as Record<string, unknown>[],
        ...(rows.length < chunkSize ? { totalRows: chunkIndex * chunkSize + rows.length } : {}),
      };
      
      chunkIndex++;
    }
  } finally {
    client.release();  // 归还连接到池
  }
}

pg 库特性

VSDB 选择 pg 的原因:

特性pg其他库
纯 JavaScript部分 C++
连接池✓(内置)需额外配置
Promise 支持
Cursor 流式有限
TypeScript 类型✓(@types/pg)变化

5.4 连接池策略

MySQL vs PostgreSQL

graph TD
    subgraph "MySQL 连接策略"
        M1["单一连接"]
        M2["每次查询复用"]
        M3["流式查询使用独立连接"]
    end
    
    subgraph "PostgreSQL 连接策略"
        P1["连接池(max=5)"]
        P2["查询自动从池获取"]
        P3["流式查询使用独立客户端"]
    end
数据库连接模型原因
MySQL单一连接mysql2 默认不池化,简单够用
PostgreSQL连接池pg 内置池,PostgreSQL 连接开销大

连接池配置

// PostgreSQL 默认配置
const poolConfig = {
  max: 5,                        // 最大连接数
  connectionTimeoutMillis: 10000, // 连接超时
  idleTimeoutMillis: 10000,      // 空闲超时
};

5.5 Schema Inspector

MySqlSchema 实现

export class MySqlSchema {
  async getDatabases(conn: mysql.Connection): Promise<SchemaResult> {
    const [rows] = await conn.execute('SHOW DATABASES');
    return {
      type: 'databases',
      data: rows.map(r => ({ name: r.Database })),
    };
  }
  
  async getTables(conn: mysql.Connection, database: string): Promise<SchemaResult> {
    const [rows] = await conn.execute(
      `SHOW TABLES FROM ${mysql.escapeId(database)}`
    );
    const key = `Tables_in_${database}`;
    return {
      type: 'tables',
      data: rows.map(r => ({ name: r[key] })),
    };
  }
  
  async getColumns(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
    const [rows] = await conn.execute(
      `SHOW COLUMNS FROM ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
    );
    return {
      type: 'columns',
      data: rows.map(r => ({
        name: r.Field,
        type: r.Type,
        nullable: r.Null === 'YES',
        defaultValue: r.Default,
        isPrimaryKey: r.Key === 'PRI',
        isAutoIncrement: r.Extra?.includes('auto_increment'),
      })),
    };
  }
  
  async getIndexes(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
    const [rows] = await conn.execute(
      `SHOW INDEX FROM ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
    );
    // 合并同一索引的多行
    const indexMap = new Map();
    for (const r of rows) {
      if (!indexMap.has(r.Key_name)) {
        indexMap.set(r.Key_name, {
          name: r.Key_name,
          columns: [],
          isUnique: !r.Non_unique,
          isPrimary: r.Key_name === 'PRIMARY',
        });
      }
      indexMap.get(r.Key_name).columns.push(r.Column_name);
    }
    return {
      type: 'indexes',
      data: Array.from(indexMap.values()),
    };
  }
  
  async getConstraints(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
    const [rows] = await conn.execute(
      `SELECT * FROM information_schema.TABLE_CONSTRAINTS 
       WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?`,
      [database, table]
    );
    return {
      type: 'constraints',
      data: rows.map(r => ({
        name: r.CONSTRAINT_NAME,
        type: r.CONSTRAINT_TYPE,
      })),
    };
  }
  
  async getDDL(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
    const [rows] = await conn.execute(
      `SHOW CREATE TABLE ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
    );
    return {
      type: 'ddl',
      data: [{ ddl: rows[0]['Create Table'] }],
    };
  }
}

PgSchema 实现

export class PgSchema {
  async getDatabases(pool: Pool): Promise<SchemaResult> {
    const result = await pool.query(
      `SELECT datname FROM pg_database WHERE datistemplate = false`
    );
    return {
      type: 'databases',
      data: result.rows.map(r => ({ name: r.datname })),
    };
  }
  
  async getTables(pool: Pool, schema: string): Promise<SchemaResult> {
    const result = await pool.query(
      `SELECT table_name FROM information_schema.tables 
       WHERE table_schema = $1 AND table_type = 'BASE TABLE'`,
      [schema]
    );
    return {
      type: 'tables',
      data: result.rows.map(r => ({ name: r.table_name, schema })),
    };
  }
  
  async getColumns(pool: Pool, schema: string, table: string): Promise<SchemaResult> {
    const result = await pool.query(
      `SELECT column_name, data_type, is_nullable, column_default,
              CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_pk
       FROM information_schema.columns c
       LEFT JOIN (
         SELECT kcu.column_name
         FROM information_schema.table_constraints tc
         JOIN information_schema.key_column_usage kcu 
           ON tc.constraint_name = kcu.constraint_name
         WHERE tc.constraint_type = 'PRIMARY KEY'
           AND tc.table_schema = $1 AND tc.table_name = $2
       ) pk ON c.column_name = pk.column_name
       WHERE c.table_schema = $1 AND c.table_name = $2`,
      [schema, table]
    );
    return {
      type: 'columns',
      data: result.rows.map(r => ({
        name: r.column_name,
        type: r.data_type,
        nullable: r.is_nullable === 'YES',
        defaultValue: r.column_default,
        isPrimaryKey: r.is_pk,
        isAutoIncrement: false,  // PostgreSQL 无原生 auto_increment
      })),
    };
  }
  
  async getDDL(pool: Pool, schema: string, table: string): Promise<SchemaResult> {
    // PostgreSQL DDL 需要拼接,无直接命令
    // 实际实现需要更复杂的逻辑
    const result = await pool.query(
      `SELECT 
        'CREATE TABLE ' || quote_ident($2) || ' (' ||
        string_agg(
          quote_ident(column_name) || ' ' || data_type ||
          CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END,
          ', '
        ) || ')' as ddl
       FROM information_schema.columns
       WHERE table_schema = $1 AND table_name = $2`,
      [schema, table]
    );
    return {
      type: 'ddl',
      data: [{ ddl: result.rows[0]?.ddl || '' }],
    };
  }
}

5.6 查询队列

QueryQueue 实现

class QueryQueue {
  private queue = new Map<string, PendingQuery>();
  private runningCount = 0;
  private maxConcurrency = 5;
  private defaultTimeout = 30000;
  
  enqueue(
    request: WorkerRequest, 
    handler: () => Promise<WorkerResponse>
  ): Promise<WorkerResponse> {
    return new Promise((resolve, reject) => {
      const pending: PendingQuery = {
        request,
        handler,
        resolve,
        reject,
        timer: setTimeout(() => {
          this.queue.delete(request.id);
          reject(new Error('Query timeout'));
        }, this.defaultTimeout),
      };
      
      this.queue.set(request.id, pending);
      this.processQueue();
    });
  }
  
  private processQueue(): void {
    while (this.runningCount < this.maxConcurrency && this.queue.size > 0) {
      const [id, pending] = Array.from(this.queue.entries())[0];
      this.queue.delete(id);
      this.runningCount++;
      
      pending.handler()
        .then(response => {
          clearTimeout(pending.timer);
          pending.resolve(response);
          this.runningCount--;
          this.processQueue();
        })
        .catch(error => {
          clearTimeout(pending.timer);
          pending.reject(error);
          this.runningCount--;
          this.processQueue();
        });
    }
  }
  
  cancel(requestId: string): void {
    const pending = this.queue.get(requestId);
    if (pending) {
      clearTimeout(pending.timer);
      pending.reject(new Error('Query cancelled'));
      this.queue.delete(requestId);
    }
  }
  
  dispose(): void {
    for (const [id, pending] of this.queue) {
      clearTimeout(pending.timer);
      pending.reject(new Error('QueryQueue disposed'));
    }
    this.queue.clear();
  }
}

并发控制示意

graph TD
    Q["请求队列"] --> C{"并发检查"}
    C -->|"runningCount < max"| E["执行查询"]
    C -->|"runningCount >= max"| W["等待"]
    
    E --> R["runningCount++"]
    R --> D["完成"]
    D --> Dec["runningCount--"]
    Dec --> C

5.7 错误处理

错误分类

type QueryErrorClass = 'connection' | 'syntax' | 'timeout' | 'permission' | 'unknown';

function classifyQueryError(error: unknown): ClassifiedError {
  const message = error instanceof Error ? error.message : String(error);
  
  // 连接错误(可重试)
  if (
    message.includes('ECONNREFUSED') ||
    message.includes('ETIMEDOUT') ||
    message.includes('PROTOCOL_CONNECTION_LOST')
  ) {
    return { errorClass: 'connection', retryable: true };
  }
  
  // 语法错误(不可重试)
  if (
    message.includes('syntax error') ||
    message.includes('ER_PARSE_ERROR') ||
    message.includes('42601')  // PostgreSQL syntax error code
  ) {
    return { errorClass: 'syntax', retryable: false };
  }
  
  // 超时错误(可重试)
  if (message.includes('timeout') || message.includes('timed out')) {
    return { errorClass: 'timeout', retryable: true };
  }
  
  // 权限错误(不可重试)
  if (
    message.includes('Access denied') ||
    message.includes('permission denied') ||
    message.includes('42501')  // PostgreSQL permission denied
  ) {
    return { errorClass: 'permission', retryable: false };
  }
  
  return { errorClass: 'unknown', retryable: false };
}

错误响应格式

function sendError(requestId: string, code: string, message: string, error?: unknown): void {
  const classification = error ? classifyQueryError(error) : undefined;
  
  send({
    id: requestId,
    type: 'error',
    error: {
      code,
      message,
      ...(classification ? {
        errorClass: classification.errorClass,
        retryable: classification.retryable,
      } : {}),
    },
  });
}

5.8 小结

本章介绍了 VSDB 的数据库驱动实现:

| 驱动 | 库 | 连接模型 | 特点 | |------|----|---------:----| | MySqlDriver | mysql2/promise | 单一连接 | Promise 原生,快速 | | PostgreSqlDriver | pg | 连接池 | 内置池,Cursor 流式 |

关键设计决策:

  • MySQL 单连接:简单够用,流式查询用独立连接
  • PostgreSQL 连接池:连接开销大,池化提升性能
  • Schema Inspector:SHOW 命令 vs information_schema
  • QueryQueue:并发控制 + 超时管理
  • 错误分类:区分可重试/不可重试错误

下一章将介绍扫描引擎的设计与实现。