第六章:Storage 存储层
Storage 存储层提供状态持久化能力,确保执行过程可恢复。
6.1 存储架构总览
目录结构
.openmatrix/
├── state.json # 全局状态
├── plan.md # AI 生成的计划
├── context.md # Agent Memory
├── tasks-input.json # 任务输入定义
├── tasks/
│ ├── TASK-001/
│ │ ├── task.json # 任务定义
│ │ ├── context.md # 任务上下文
│ │ ├── develop.json # 开发阶段结果
│ │ ├── verify.json # 验证阶段结果
│ │ ├── accept.json # 接受阶段结果
│ │ └── artifacts/ # 输出文件
│ ├── TASK-002/
│ │ └── ...
│ └── ...
├── approvals/
│ ├── APPROVAL-001/
│ │ └── approval.json # 审批记录
│ └── ...
├── meetings/
│ ├── MEETING-001/
│ │ ├── meeting.json # Meeting 定义
│ │ ├── context.md # 阻塞上下文
│ │ └── resolution.md # 解决方案
│ └── ...
└── .lock # 文件锁(运行时)
存储层级结构
graph TD
subgraph "存储层组件"
FS[FileStore<br/>文件操作]
SM[StateManager<br/>状态管理]
end
subgraph "文件结构"
ST[state.json]
PL[plan.md]
CTX[context.md]
TI[tasks-input.json]
TK[tasks/*.json]
AP[approvals/*.json]
MT[meetings/*.json]
LK[.lock]
end
FS --> ST
FS --> PL
FS --> CTX
FS --> TI
FS --> TK
FS --> AP
FS --> MT
SM --> FS
SM --> LK
6.2 FileStore 文件存储
FileStore 提供底层文件操作能力。
核心接口
// storage/file-store.ts
interface FileStore {
/**
* 读取 JSON 文件
*/
readJson<T>(path: string): T;
/**
* 写入 JSON 文件
*/
writeJson<T>(path: string, data: T): void;
/**
* 读取 Markdown 文件
*/
readMd(path: string): string;
/**
* 写入 Markdown 文件
*/
writeMd(path: string, content: string): void;
/**
* 原子追加内容
* 确保并发安全
*/
atomicAppend(path: string, content: string): void;
/**
* 检查文件是否存在
*/
exists(path: string): boolean;
/**
* 删除文件
*/
delete(path: string): void;
/**
* 列出目录内容
*/
listDir(path: string): string[];
/**
* 创建目录
*/
createDir(path: string): void;
/**
* 删除目录
*/
removeDir(path: string): void;
}
JSON 文件操作
class FileStoreImpl implements FileStore {
readJson<T>(path: string): T {
if (!this.exists(path)) {
throw new Error(`File not found: ${path}`);
}
const content = fs.readFileSync(path, 'utf-8');
return JSON.parse(content) as T;
}
writeJson<T>(path: string, data: T): void {
const content = JSON.stringify(data, null, 2);
// 确保目录存在
const dir = path.dirname(path);
if (!this.exists(dir)) {
this.createDir(dir);
}
fs.writeFileSync(path, content, 'utf-8');
}
}
Markdown 文件操作
class FileStoreImpl implements FileStore {
readMd(path: string): string {
if (!this.exists(path)) {
return ''; // 空文件返回空字符串
}
return fs.readFileSync(path, 'utf-8');
}
writeMd(path: string, content: string): void {
const dir = path.dirname(path);
if (!this.exists(dir)) {
this.createDir(dir);
}
fs.writeFileSync(path, content, 'utf-8');
}
atomicAppend(path: string, content: string): void {
// 使用追加模式确保原子性
const dir = path.dirname(path);
if (!this.exists(dir)) {
this.createDir(dir);
}
// 添加时间戳分隔
const timestamp = new Date().toISOString();
const fullContent = `\n\n---\n**${timestamp}**\n${content}`;
fs.appendFileSync(path, fullContent, 'utf-8');
}
}
文件锁机制
class FileStoreImpl implements FileStore {
private lockPath = '.openmatrix/.lock';
private lockTimeout = 30000; // 30秒
acquireLock(): Promise<void> {
const startTime = Date.now();
while (Date.now() - startTime < this.lockTimeout) {
try {
// 尝试创建锁文件
const lockContent = JSON.stringify({
pid: process.pid,
timestamp: Date.now()
});
fs.writeFileSync(this.lockPath, lockContent, { flag: 'wx' });
return; // 成功获取锁
} catch (err) {
if (err.code === 'EEXIST') {
// 锁文件已存在,检查是否过期
const lockData = this.readJson(this.lockPath);
const lockAge = Date.now() - lockData.timestamp;
if (lockAge > this.lockTimeout) {
// 锁已过期,强制删除
this.delete(this.lockPath);
continue;
}
// 等待一小段时间后重试
await new Promise(r => setTimeout(r, 100));
} else {
throw err;
}
}
}
throw new Error('Failed to acquire lock within timeout');
}
releaseLock(): void {
if (this.exists(this.lockPath)) {
const lockData = this.readJson(this.lockPath);
// 只释放自己持有的锁
if (lockData.pid === process.pid) {
this.delete(this.lockPath);
}
}
}
}
6.3 StateManager 状态管理
StateManager 管理全局和任务状态。
核心接口
// storage/state-manager.ts
interface StateManager {
/**
* 初始化存储
*/
initialize(runId: string, input: TaskInput): void;
/**
* 获取全局状态
*/
getState(): GlobalState;
/**
* 更新全局状态
*/
updateState(updates: Partial<GlobalState>): void;
/**
* 获取任务
*/
getTask(taskId: string): Task;
/**
* 获取所有任务
*/
getAllTasks(): Task[];
/**
* 按状态获取任务
*/
getTasksByStatus(status: TaskStatus): Task[];
/**
* 保存任务
*/
saveTask(task: Task): void;
/**
* 更新任务状态
*/
updateTaskStatus(taskId: string, status: TaskStatus): void;
/**
* 保存阶段结果
*/
savePhaseResult(taskId: string, phase: PhaseType, result: any): void;
/**
* 获取 Meeting
*/
getMeeting(meetingId: string): Meeting;
/**
* 获取待处理 Meeting
*/
getPendingMeetings(): Meeting[];
/**
* 保存 Meeting
*/
saveMeeting(meeting: Meeting): void;
/**
* 获取审批
*/
getApproval(approvalId: string): Approval;
/**
* 获取待处理审批
*/
getPendingApprovals(): Approval[];
/**
* 保存审批
*/
saveApproval(approval: Approval): void;
/**
* 获取执行统计
*/
getStatistics(): ExecutionStatistics;
/**
* 清理存储
*/
cleanup(): void;
}
GlobalState 结构
interface GlobalState {
// 运行标识
runId: string;
status: ExecutionStatus;
// 配置
config: {
quality: QualityLevel;
mode: ExecutionMode;
e2eTests: boolean;
bypassApprovals: boolean;
};
// 统计
statistics: {
totalTasks: number;
completedTasks: number;
failedTasks: number;
blockedTasks: number;
pendingMeetings: number;
pendingApprovals: number;
};
// 当前任务
currentTask: string | null;
// 时间信息
createdAt: string;
updatedAt: string;
completedAt?: string;
}
状态初始化
class StateManagerImpl implements StateManager {
initialize(runId: string, input: TaskInput): void {
const basePath = '.openmatrix';
// 创建目录结构
if (!this.fileStore.exists(basePath)) {
this.fileStore.createDir(basePath);
this.fileStore.createDir(`${basePath}/tasks`);
this.fileStore.createDir(`${basePath}/approvals`);
this.fileStore.createDir(`${basePath}/meetings`);
}
// 初始化全局状态
const state: GlobalState = {
runId,
status: 'running',
config: {
quality: 'balanced',
mode: 'semi-auto',
e2eTests: false,
bypassApprovals: false
},
statistics: {
totalTasks: 0,
completedTasks: 0,
failedTasks: 0,
blockedTasks: 0,
pendingMeetings: 0,
pendingApprovals: 0
},
currentTask: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
};
this.fileStore.writeJson(`${basePath}/state.json`, state);
// 保存任务输入
this.fileStore.writeJson(`${basePath}/tasks-input.json`, input);
// 初始化上下文文件
this.fileStore.writeMd(`${basePath}/context.md`, '# Agent Memory\n\n');
this.fileStore.writeMd(`${basePath}/plan.md`, '# Execution Plan\n\n');
}
}
任务状态管理
class StateManagerImpl implements StateManager {
saveTask(task: Task): void {
const basePath = `.openmatrix/tasks/${task.id}`;
// 创建任务目录
if (!this.fileStore.exists(basePath)) {
this.fileStore.createDir(basePath);
this.fileStore.createDir(`${basePath}/artifacts`);
}
// 保存任务定义
this.fileStore.writeJson(`${basePath}/task.json`, task);
// 更新统计
this.updateStatistics();
}
updateTaskStatus(taskId: string, status: TaskStatus): void {
const task = this.getTask(taskId);
task.status = status;
task.updatedAt = new Date().toISOString();
// 记录状态历史
task.statusHistory.push({
from: task.status,
to: status,
timestamp: new Date().toISOString()
});
this.saveTask(task);
// 更新全局状态
this.updateState({
currentTask: status === 'in_progress' ? taskId : null,
updatedAt: new Date().toISOString()
});
// 更新统计
this.updateStatistics();
}
updateStatistics(): void {
const tasks = this.getAllTasks();
const meetings = this.getPendingMeetings();
const approvals = this.getPendingApprovals();
this.updateState({
statistics: {
totalTasks: tasks.length,
completedTasks: tasks.filter(t => t.status === 'completed').length,
failedTasks: tasks.filter(t => t.status === 'failed').length,
blockedTasks: tasks.filter(t => t.status === 'waiting').length,
pendingMeetings: meetings.length,
pendingApprovals: approvals.length
}
});
}
}
阶段结果保存
class StateManagerImpl implements StateManager {
savePhaseResult(taskId: string, phase: PhaseType, result: any): void {
const basePath = `.openmatrix/tasks/${taskId}`;
// 保存阶段结果文件
this.fileStore.writeJson(`${basePath}/${phase}.json`, result);
// 更新任务的状态
const task = this.getTask(taskId);
task.phases.push({
type: phase,
status: result.status,
completedAt: new Date().toISOString()
});
this.saveTask(task);
}
}
Meeting 管理
class StateManagerImpl implements StateManager {
getPendingMeetings(): Meeting[] {
const basePath = '.openmatrix/meetings';
if (!this.fileStore.exists(basePath)) {
return [];
}
const meetingDirs = this.fileStore.listDir(basePath);
const meetings: Meeting[] = [];
for (const dir of meetingDirs) {
const meetingPath = `${basePath}/${dir}/meeting.json`;
if (this.fileStore.exists(meetingPath)) {
const meeting = this.fileStore.readJson<Meeting>(meetingPath);
if (meeting.status === 'pending') {
meetings.push(meeting);
}
}
}
return meetings;
}
saveMeeting(meeting: Meeting): void {
const basePath = `.openmatrix/meetings/${meeting.id}`;
if (!this.fileStore.exists(basePath)) {
this.fileStore.createDir(basePath);
}
this.fileStore.writeJson(`${basePath}/meeting.json`, meeting);
// 如果有上下文,保存上下文
if (meeting.context) {
this.fileStore.writeMd(`${basePath}/context.md`, meeting.context);
}
this.updateStatistics();
}
}
审批管理
class StateManagerImpl implements StateManager {
getPendingApprovals(): Approval[] {
const basePath = '.openmatrix/approvals';
if (!this.fileStore.exists(basePath)) {
return [];
}
const approvalDirs = this.fileStore.listDir(basePath);
const approvals: Approval[] = [];
for (const dir of approvalDirs) {
const approvalPath = `${basePath}/${dir}/approval.json`;
if (this.fileStore.exists(approvalPath)) {
const approval = this.fileStore.readJson<Approval>(approvalPath);
if (approval.status === 'pending') {
approvals.push(approval);
}
}
}
return approvals;
}
saveApproval(approval: Approval): void {
const basePath = `.openmatrix/approvals/${approval.id}`;
if (!this.fileStore.exists(basePath)) {
this.fileStore.createDir(basePath);
}
this.fileStore.writeJson(`${basePath}/approval.json`, approval);
this.updateStatistics();
}
}
清理存储
class StateManagerImpl implements StateManager {
cleanup(): void {
const basePath = '.openmatrix';
// 删除锁文件
if (this.fileStore.exists(`${basePath}/.lock`)) {
this.fileStore.delete(`${basePath}/.lock`);
}
// 可选:清理临时文件
// 注意:不删除实际数据,只清理运行时临时文件
}
}
6.4 数据类型定义
Task 类型
interface Task {
id: string; // TASK-001
title: string; // 任务标题
description: string; // 详细描述
status: TaskStatus; // 当前状态
// Agent 相关
agentType: AgentType; // 负责的 Agent
phases: PhaseRecord[]; // 阶段执行记录
// 依赖关系
dependencies: string[]; // 依赖的任务 ID
dependents: string[]; // 被依赖的任务 ID
// 优先级和复杂度
priority: number; // 优先级 (1-10)
estimatedComplexity: string; // 预估复杂度
// 输出
artifacts: string[]; // 输出文件路径
// 时间信息
createdAt: string;
updatedAt: string;
completedAt?: string;
// 状态历史
statusHistory: StatusTransition[];
// 重试信息
retryCount?: number;
lastError?: ErrorInfo;
}
interface PhaseRecord {
type: PhaseType;
status: 'pending' | 'running' | 'success' | 'failed';
startedAt?: string;
completedAt?: string;
error?: ErrorInfo;
}
interface StatusTransition {
from: TaskStatus;
to: TaskStatus;
timestamp: string;
reason?: string;
}
Meeting 类型
interface Meeting {
id: string; // MEETING-001
taskId: string; // 关联任务
type: MeetingType; // 类型
title: string; // 标题
description: string; // 详细描述
context?: string; // 阻塞上下文
questions: Question[]; // 需要回答的问题
status: 'pending' | 'resolved' | 'skipped';
createdAt: string;
resolvedAt?: string;
resolution?: Resolution;
}
type MeetingType =
| 'information' // 缺少信息
| 'decision' // 需要决策
| 'approval' // 需要审批
| 'dependency'; // 依赖问题
interface Question {
id: string;
text: string;
type: 'text' | 'choice' | 'boolean';
options?: string[];
required: boolean;
default?: string;
}
interface Resolution {
answers: Map<string, string>;
decision?: 'skip' | 'retry';
notes?: string;
}
Approval 类型
interface Approval {
id: string; // APPROVAL-001
type: ApprovalType; // 审批类型
content: ApprovalContent; // 审批内容
context?: string; // 相关上下文
status: 'pending' | 'approved' | 'rejected';
createdAt: string;
processedAt?: string;
decision?: 'approve' | 'reject';
feedback?: string;
}
type ApprovalType =
| 'plan' // 计划审批
| 'merge' // 合并审批
| 'deploy' // 部署审批
| 'meeting'; // Meeting 决策
interface ApprovalContent {
// plan 类型
tasks?: Task[];
planSummary?: string;
// merge 类型
files?: string[];
commitMessage?: string;
// deploy 类型
environment?: string;
config?: any;
// meeting 类型
meeting?: Meeting;
}
统计类型
interface ExecutionStatistics {
totalTasks: number;
completedTasks: number;
failedTasks: number;
blockedTasks: number;
// 阶段统计
phases: {
tdd: { completed: number; failed: number };
develop: { completed: number; failed: number };
verify: { completed: number; failed: number };
accept: { completed: number; failed: number };
};
// 时间统计
timing: {
startedAt: string;
estimatedCompletion?: string;
actualCompletion?: string;
totalDurationMs?: number;
};
// 质量统计
quality: {
averageCoverage?: number;
averageReviewScore?: number;
totalLintErrors?: number;
securityIssues?: number;
};
}
6.5 数据一致性保障
写前读验证
class StateManagerImpl {
updateTask(taskId: string, updates: Partial<Task>): void {
// 获取锁
this.fileStore.acquireLock();
try {
// 读取当前状态
const currentTask = this.getTask(taskId);
// 验证状态一致性
if (currentTask.updatedAt !== updates.updatedAt) {
throw new Error('Task state conflict - concurrent modification');
}
// 应用更新
const updatedTask = { ...currentTask, ...updates };
updatedTask.updatedAt = new Date().toISOString();
// 保存
this.saveTask(updatedTask);
} finally {
// 释放锁
this.fileStore.releaseLock();
}
}
}
状态版本控制
interface VersionedState {
version: number;
state: GlobalState;
checksum: string;
}
class StateManagerImpl {
updateState(updates: Partial<GlobalState>): void {
this.fileStore.acquireLock();
try {
const current = this.getStateWithVersion();
// 验证版本
const newVersion = current.version + 1;
// 创建新状态
const newState = {
version: newVersion,
state: { ...current.state, ...updates },
checksum: this.calculateChecksum({ ...current.state, ...updates })
};
// 保存
this.fileStore.writeJson('.openmatrix/state.json', newState);
} finally {
this.fileStore.releaseLock();
}
}
calculateChecksum(data: any): string {
const content = JSON.stringify(data);
return crypto.createHash('md5').update(content).digest('hex');
}
}
并发写入处理
flowchart TD
A[并发写入请求] --> B[获取文件锁]
B --> C{锁获取成功?}
C --> |"成功"| D[读取当前状态]
C --> |"失败"| E[等待重试]
D --> F[验证版本一致性]
F --> G{版本一致?}
G --> |"一致"| H[应用更新]
G --> |"不一致"| I[合并变更<br/>或报错]
H --> J[写入新状态]
J --> K[释放锁]
E --> B
I --> J
K --> L[完成]
6.6 数据恢复机制
状态恢复
class StateManagerImpl {
recoverState(): GlobalState | null {
const statePath = '.openmatrix/state.json';
if (!this.fileStore.exists(statePath)) {
return null; // 无历史状态
}
try {
const versioned = this.fileStore.readJson<VersionedState>(statePath);
// 验证 checksum
const expectedChecksum = this.calculateChecksum(versioned.state);
if (versioned.checksum !== expectedChecksum) {
// 状态文件损坏,尝试恢复
return this.recoverFromBackup();
}
return versioned.state;
} catch (err) {
// JSON 解析失败,文件损坏
return this.recoverFromBackup();
}
}
recoverFromBackup(): GlobalState | null {
const backupPath = '.openmatrix/state.json.backup';
if (this.fileStore.exists(backupPath)) {
try {
return this.fileStore.readJson<GlobalState>(backupPath);
} catch {
return null;
}
}
return null;
}
}
任务恢复
class StateManagerImpl {
recoverTasks(): Task[] {
const basePath = '.openmatrix/tasks';
if (!this.fileStore.exists(basePath)) {
return [];
}
const tasks: Task[] = [];
const taskDirs = this.fileStore.listDir(basePath);
for (const dir of taskDirs) {
const taskPath = `${basePath}/${dir}/task.json`;
if (this.fileStore.exists(taskPath)) {
try {
const task = this.fileStore.readJson<Task>(taskPath);
tasks.push(task);
} catch {
// 单个任务文件损坏,跳过
console.warn(`Corrupted task file: ${taskPath}`);
}
}
}
return tasks;
}
}
6.7 存储监控
存储健康检查
interface StorageHealth {
status: 'healthy' | 'warning' | 'critical';
checks: {
stateFile: 'ok' | 'missing' | 'corrupted';
tasksDir: 'ok' | 'missing';
meetingsDir: 'ok' | 'missing';
approvalsDir: 'ok' | 'missing';
};
issues: string[];
lastChecked: string;
}
class StateManagerImpl {
checkHealth(): StorageHealth {
const health: StorageHealth = {
status: 'healthy',
checks: {
stateFile: 'ok',
tasksDir: 'ok',
meetingsDir: 'ok',
approvalsDir: 'ok'
},
issues: [],
lastChecked: new Date().toISOString()
};
// 检查 state.json
const statePath = '.openmatrix/state.json';
if (!this.fileStore.exists(statePath)) {
health.checks.stateFile = 'missing';
health.issues.push('state.json is missing');
health.status = 'critical';
} else {
try {
this.fileStore.readJson(statePath);
} catch {
health.checks.stateFile = 'corrupted';
health.issues.push('state.json is corrupted');
health.status = 'critical';
}
}
// 检查目录
const dirs = ['tasks', 'meetings', 'approvals'];
for (const dir of dirs) {
const dirPath = `.openmatrix/${dir}`;
if (!this.fileStore.exists(dirPath)) {
health.checks[`dir`] = 'missing';
health.issues.push(`${dir} directory is missing`);
health.status = 'warning';
}
}
return health;
}
}
存储大小监控
class StateManagerImpl {
getStorageSize(): StorageSizeInfo {
const basePath = '.openmatrix';
const sizeInfo = {
totalSizeBytes: 0,
files: {
state: 0,
tasks: 0,
meetings: 0,
approvals: 0,
context: 0,
plan: 0
}
};
// 计算各部分大小
const statePath = `${basePath}/state.json`;
if (this.fileStore.exists(statePath)) {
sizeInfo.files.state = fs.statSync(statePath).size;
}
const tasksDir = `${basePath}/tasks`;
if (this.fileStore.exists(tasksDir)) {
sizeInfo.files.tasks = this.calculateDirSize(tasksDir);
}
// ... 其他部分
sizeInfo.totalSizeBytes = Object.values(sizeInfo.files)
.reduce((a, b) => a + b, 0);
return sizeInfo;
}
}
下一章将详细讲解 CLI 命令系统。