第五章:数据库驱动实现
本章介绍 VSDB 的数据库驱动抽象层和 MySQL/PostgreSQL 驱动的具体实现。
5.1 驱动抽象层设计
驱动接口
VSDB 没有显式定义 Driver 接口,但所有驱动遵循相同的方法签名:
// 隐式驱动接口
interface DatabaseDriver {
// 连接管理
connect(config: DbConnection): Promise<void>;
disconnect(): Promise<void>;
isConnected(): boolean;
// 查询执行
query(sql: string, params?: unknown[]): Promise<QueryResult>;
streamQuery(sql: string, params?: unknown[], chunkSize?: number): AsyncGenerator<StreamChunkData>;
// 访问底层连接(用于 Schema 查询)
getConnection(): Connection | Pool | null;
getPool(): Pool | null; // PostgreSQL
}
驱动选择策略
// worker.ts
function createDriver(config: DbConnection): DatabaseDriver {
return config.type === 'mysql'
? new MySqlDriver()
: new PostgreSqlDriver();
}
graph TD
A["DbConnection.type"] --> B{"数据库类型"}
B -->|"mysql"| C["MySqlDriver"]
B -->|"postgresql"| D["PostgreSqlDriver"]
C --> E["mysql2/promise"]
D --> F["pg"]
E --> G["MySQL 连接"]
F --> H["PostgreSQL Pool"]
5.2 MySQL 驱动实现
MySqlDriver 类
import mysql from 'mysql2/promise';
import type { DbConnection, QueryResult } from '../../shared/types';
export class MySqlDriver {
private connection: mysql.Connection | null = null;
// 连接配置
async connect(config: DbConnection): Promise<void> {
if (this.connection) {
await this.disconnect(); // 先断开旧连接
}
this.connection = await mysql.createConnection({
host: config.host,
port: config.port,
user: config.username,
password: config.password,
database: config.database,
connectTimeout: 10000, // 10 秒连接超时
...config.options, // 扩展选项(SSL、charset 等)
});
}
// 断开连接
async disconnect(): Promise<void> {
if (this.connection) {
try {
await this.connection.end();
} catch {
// 连接可能已关闭,忽略错误
}
this.connection = null;
}
}
// 检查连接状态
isConnected(): boolean {
return this.connection !== null;
}
// 获取底层连接
getConnection(): mysql.Connection | null {
return this.connection;
}
}
查询执行
async query(sql: string, params?: any[]): Promise<QueryResult> {
if (!this.connection) {
throw new Error('MySQL: not connected');
}
const startTime = Date.now();
const [result] = await this.connection.execute(sql, params);
const executionTime = Date.now() - startTime;
// 区分 SELECT 和 INSERT/UPDATE/DELETE
if (result && typeof result === 'object' && 'affectedRows' in result) {
// INSERT/UPDATE/DELETE 返回 ResultSetHeader
return {
columns: [],
rows: [],
rowCount: (result as any).affectedRows,
affectedRows: (result as any).affectedRows,
executionTime,
};
}
// SELECT 返回 RowDataPacket[]
const rows = Array.isArray(result) ? result as Record<string, unknown>[] : [];
const columns = rows.length > 0 ? Object.keys(rows[0]) : [];
return {
columns,
rows,
rowCount: rows.length,
executionTime,
};
}
流式查询
async *streamQuery(
sql: string,
params?: any[],
chunkSize = 1000
): AsyncGenerator<{ chunkIndex: number; rows: Record<string, unknown>[]; totalRows?: number }> {
if (!this.connection) {
throw new Error('MySQL: not connected');
}
// 创建独立连接避免阻塞主连接
const streamConn = await mysql.createConnection(
(this.connection as any).connection?.config || {}
);
try {
const [result] = await streamConn.execute(sql, params);
if (!Array.isArray(result)) {
yield { chunkIndex: 0, rows: [], totalRows: 0 };
return;
}
const allRows = result;
const totalRows = allRows.length;
let chunkIndex = 0;
// 分块 yield
for (let offset = 0; offset < allRows.length; offset += chunkSize) {
const chunk = allRows.slice(offset, offset + chunkSize);
const isLast = offset + chunkSize >= allRows.length;
yield {
chunkIndex,
rows: chunk,
...(isLast ? { totalRows } : {}),
};
chunkIndex++;
}
} finally {
await streamConn.end(); // 确保关闭流连接
}
}
mysql2 特性
VSDB 选择 mysql2 的原因:
| 特性 | mysql2 | mysql (旧版) |
|---|---|---|
| Promise 支持 | ✓(原生) | ✗(需包装) |
| Prepare Statement | ✓ | ✓ |
| 流式查询 | ✓ | ✓ |
| 性能 | 更快 | 较慢 |
| 维护状态 | 活跃 | 已停止 |
5.3 PostgreSQL 驱动实现
PostgreSqlDriver 类
import { Pool } from 'pg';
import type { DbConnection, QueryResult } from '../../shared/types';
export class PostgreSqlDriver {
private pool: Pool | null = null;
async connect(config: DbConnection): Promise<void> {
if (this.pool) {
await this.disconnect();
}
// PostgreSQL 使用连接池
this.pool = new Pool({
host: config.host,
port: config.port,
user: config.username,
password: config.password,
database: config.database || 'postgres',
max: 5, // 最大连接数
connectionTimeoutMillis: 10000,
...config.options,
});
// 测试连接
try {
await this.pool.query('SELECT 1');
} catch (err) {
this.pool.end();
this.pool = null;
throw err;
}
}
async disconnect(): Promise<void> {
if (this.pool) {
await this.pool.end();
this.pool = null;
}
}
isConnected(): boolean {
return this.pool !== null;
}
getPool(): Pool | null {
return this.pool;
}
}
查询执行
async query(sql: string, params?: any[]): Promise<QueryResult> {
if (!this.pool) {
throw new Error('PostgreSQL: not connected');
}
const startTime = Date.now();
const result = await this.pool.query(sql, params);
const executionTime = Date.now() - startTime;
// PostgreSQL 结果格式不同于 MySQL
if (result.command !== 'SELECT') {
// INSERT/UPDATE/DELETE
return {
columns: [],
rows: [],
rowCount: result.rowCount || 0,
affectedRows: result.rowCount || 0,
executionTime,
};
}
// SELECT
const rows = result.rows as Record<string, unknown>[];
const columns = result.fields?.map(f => f.name) ||
(rows.length > 0 ? Object.keys(rows[0]) : []);
return {
columns,
rows,
rowCount: rows.length,
executionTime,
};
}
流式查询
async *streamQuery(
sql: string,
params?: any[],
chunkSize = 1000
): AsyncGenerator<StreamChunkData> {
if (!this.pool) {
throw new Error('PostgreSQL: not connected');
}
// 使用单一连接执行流式查询
const client = await this.pool.connect();
try {
// 使用 cursor 实现真正的流式
const cursor = client.query(new Cursor(sql, params));
let chunkIndex = 0;
while (true) {
const rows = await cursor.read(chunkSize);
if (rows.length === 0) break;
yield {
chunkIndex,
rows: rows as Record<string, unknown>[],
...(rows.length < chunkSize ? { totalRows: chunkIndex * chunkSize + rows.length } : {}),
};
chunkIndex++;
}
} finally {
client.release(); // 归还连接到池
}
}
pg 库特性
VSDB 选择 pg 的原因:
| 特性 | pg | 其他库 |
|---|---|---|
| 纯 JavaScript | ✓ | 部分 C++ |
| 连接池 | ✓(内置) | 需额外配置 |
| Promise 支持 | ✓ | ✓ |
| Cursor 流式 | ✓ | 有限 |
| TypeScript 类型 | ✓(@types/pg) | 变化 |
5.4 连接池策略
MySQL vs PostgreSQL
graph TD
subgraph "MySQL 连接策略"
M1["单一连接"]
M2["每次查询复用"]
M3["流式查询使用独立连接"]
end
subgraph "PostgreSQL 连接策略"
P1["连接池(max=5)"]
P2["查询自动从池获取"]
P3["流式查询使用独立客户端"]
end
| 数据库 | 连接模型 | 原因 |
|---|---|---|
| MySQL | 单一连接 | mysql2 默认不池化,简单够用 |
| PostgreSQL | 连接池 | pg 内置池,PostgreSQL 连接开销大 |
连接池配置
// PostgreSQL 默认配置
const poolConfig = {
max: 5, // 最大连接数
connectionTimeoutMillis: 10000, // 连接超时
idleTimeoutMillis: 10000, // 空闲超时
};
5.5 Schema Inspector
MySqlSchema 实现
export class MySqlSchema {
async getDatabases(conn: mysql.Connection): Promise<SchemaResult> {
const [rows] = await conn.execute('SHOW DATABASES');
return {
type: 'databases',
data: rows.map(r => ({ name: r.Database })),
};
}
async getTables(conn: mysql.Connection, database: string): Promise<SchemaResult> {
const [rows] = await conn.execute(
`SHOW TABLES FROM ${mysql.escapeId(database)}`
);
const key = `Tables_in_${database}`;
return {
type: 'tables',
data: rows.map(r => ({ name: r[key] })),
};
}
async getColumns(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
const [rows] = await conn.execute(
`SHOW COLUMNS FROM ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
);
return {
type: 'columns',
data: rows.map(r => ({
name: r.Field,
type: r.Type,
nullable: r.Null === 'YES',
defaultValue: r.Default,
isPrimaryKey: r.Key === 'PRI',
isAutoIncrement: r.Extra?.includes('auto_increment'),
})),
};
}
async getIndexes(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
const [rows] = await conn.execute(
`SHOW INDEX FROM ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
);
// 合并同一索引的多行
const indexMap = new Map();
for (const r of rows) {
if (!indexMap.has(r.Key_name)) {
indexMap.set(r.Key_name, {
name: r.Key_name,
columns: [],
isUnique: !r.Non_unique,
isPrimary: r.Key_name === 'PRIMARY',
});
}
indexMap.get(r.Key_name).columns.push(r.Column_name);
}
return {
type: 'indexes',
data: Array.from(indexMap.values()),
};
}
async getConstraints(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
const [rows] = await conn.execute(
`SELECT * FROM information_schema.TABLE_CONSTRAINTS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?`,
[database, table]
);
return {
type: 'constraints',
data: rows.map(r => ({
name: r.CONSTRAINT_NAME,
type: r.CONSTRAINT_TYPE,
})),
};
}
async getDDL(conn: mysql.Connection, database: string, table: string): Promise<SchemaResult> {
const [rows] = await conn.execute(
`SHOW CREATE TABLE ${mysql.escapeId(database)}.${mysql.escapeId(table)}`
);
return {
type: 'ddl',
data: [{ ddl: rows[0]['Create Table'] }],
};
}
}
PgSchema 实现
export class PgSchema {
async getDatabases(pool: Pool): Promise<SchemaResult> {
const result = await pool.query(
`SELECT datname FROM pg_database WHERE datistemplate = false`
);
return {
type: 'databases',
data: result.rows.map(r => ({ name: r.datname })),
};
}
async getTables(pool: Pool, schema: string): Promise<SchemaResult> {
const result = await pool.query(
`SELECT table_name FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'`,
[schema]
);
return {
type: 'tables',
data: result.rows.map(r => ({ name: r.table_name, schema })),
};
}
async getColumns(pool: Pool, schema: string, table: string): Promise<SchemaResult> {
const result = await pool.query(
`SELECT column_name, data_type, is_nullable, column_default,
CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_pk
FROM information_schema.columns c
LEFT JOIN (
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = $1 AND tc.table_name = $2
) pk ON c.column_name = pk.column_name
WHERE c.table_schema = $1 AND c.table_name = $2`,
[schema, table]
);
return {
type: 'columns',
data: result.rows.map(r => ({
name: r.column_name,
type: r.data_type,
nullable: r.is_nullable === 'YES',
defaultValue: r.column_default,
isPrimaryKey: r.is_pk,
isAutoIncrement: false, // PostgreSQL 无原生 auto_increment
})),
};
}
async getDDL(pool: Pool, schema: string, table: string): Promise<SchemaResult> {
// PostgreSQL DDL 需要拼接,无直接命令
// 实际实现需要更复杂的逻辑
const result = await pool.query(
`SELECT
'CREATE TABLE ' || quote_ident($2) || ' (' ||
string_agg(
quote_ident(column_name) || ' ' || data_type ||
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END,
', '
) || ')' as ddl
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2`,
[schema, table]
);
return {
type: 'ddl',
data: [{ ddl: result.rows[0]?.ddl || '' }],
};
}
}
5.6 查询队列
QueryQueue 实现
class QueryQueue {
private queue = new Map<string, PendingQuery>();
private runningCount = 0;
private maxConcurrency = 5;
private defaultTimeout = 30000;
enqueue(
request: WorkerRequest,
handler: () => Promise<WorkerResponse>
): Promise<WorkerResponse> {
return new Promise((resolve, reject) => {
const pending: PendingQuery = {
request,
handler,
resolve,
reject,
timer: setTimeout(() => {
this.queue.delete(request.id);
reject(new Error('Query timeout'));
}, this.defaultTimeout),
};
this.queue.set(request.id, pending);
this.processQueue();
});
}
private processQueue(): void {
while (this.runningCount < this.maxConcurrency && this.queue.size > 0) {
const [id, pending] = Array.from(this.queue.entries())[0];
this.queue.delete(id);
this.runningCount++;
pending.handler()
.then(response => {
clearTimeout(pending.timer);
pending.resolve(response);
this.runningCount--;
this.processQueue();
})
.catch(error => {
clearTimeout(pending.timer);
pending.reject(error);
this.runningCount--;
this.processQueue();
});
}
}
cancel(requestId: string): void {
const pending = this.queue.get(requestId);
if (pending) {
clearTimeout(pending.timer);
pending.reject(new Error('Query cancelled'));
this.queue.delete(requestId);
}
}
dispose(): void {
for (const [id, pending] of this.queue) {
clearTimeout(pending.timer);
pending.reject(new Error('QueryQueue disposed'));
}
this.queue.clear();
}
}
并发控制示意
graph TD
Q["请求队列"] --> C{"并发检查"}
C -->|"runningCount < max"| E["执行查询"]
C -->|"runningCount >= max"| W["等待"]
E --> R["runningCount++"]
R --> D["完成"]
D --> Dec["runningCount--"]
Dec --> C
5.7 错误处理
错误分类
type QueryErrorClass = 'connection' | 'syntax' | 'timeout' | 'permission' | 'unknown';
function classifyQueryError(error: unknown): ClassifiedError {
const message = error instanceof Error ? error.message : String(error);
// 连接错误(可重试)
if (
message.includes('ECONNREFUSED') ||
message.includes('ETIMEDOUT') ||
message.includes('PROTOCOL_CONNECTION_LOST')
) {
return { errorClass: 'connection', retryable: true };
}
// 语法错误(不可重试)
if (
message.includes('syntax error') ||
message.includes('ER_PARSE_ERROR') ||
message.includes('42601') // PostgreSQL syntax error code
) {
return { errorClass: 'syntax', retryable: false };
}
// 超时错误(可重试)
if (message.includes('timeout') || message.includes('timed out')) {
return { errorClass: 'timeout', retryable: true };
}
// 权限错误(不可重试)
if (
message.includes('Access denied') ||
message.includes('permission denied') ||
message.includes('42501') // PostgreSQL permission denied
) {
return { errorClass: 'permission', retryable: false };
}
return { errorClass: 'unknown', retryable: false };
}
错误响应格式
function sendError(requestId: string, code: string, message: string, error?: unknown): void {
const classification = error ? classifyQueryError(error) : undefined;
send({
id: requestId,
type: 'error',
error: {
code,
message,
...(classification ? {
errorClass: classification.errorClass,
retryable: classification.retryable,
} : {}),
},
});
}
5.8 小结
本章介绍了 VSDB 的数据库驱动实现:
| 驱动 | 库 | 连接模型 | 特点 | |------|----|---------:----| | MySqlDriver | mysql2/promise | 单一连接 | Promise 原生,快速 | | PostgreSqlDriver | pg | 连接池 | 内置池,Cursor 流式 |
关键设计决策:
- MySQL 单连接:简单够用,流式查询用独立连接
- PostgreSQL 连接池:连接开销大,池化提升性能
- Schema Inspector:SHOW 命令 vs information_schema
- QueryQueue:并发控制 + 超时管理
- 错误分类:区分可重试/不可重试错误
下一章将介绍扫描引擎的设计与实现。