Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第六章:Storage 存储层

Storage 存储层提供状态持久化能力,确保执行过程可恢复。

6.1 存储架构总览

目录结构

.openmatrix/
├── state.json              # 全局状态
├── plan.md                 # AI 生成的计划
├── context.md              # Agent Memory
├── tasks-input.json        # 任务输入定义
├── tasks/
│   ├── TASK-001/
│   │   ├── task.json       # 任务定义
│   │   ├── context.md      # 任务上下文
│   │   ├── develop.json    # 开发阶段结果
│   │   ├── verify.json     # 验证阶段结果
│   │   ├── accept.json     # 接受阶段结果
│   │   └── artifacts/      # 输出文件
│   ├── TASK-002/
│   │   └── ...
│   └── ...
├── approvals/
│   ├── APPROVAL-001/
│   │   └── approval.json   # 审批记录
│   └── ...
├── meetings/
│   ├── MEETING-001/
│   │   ├── meeting.json    # Meeting 定义
│   │   ├── context.md      # 阻塞上下文
│   │   └── resolution.md   # 解决方案
│   └── ...
└── .lock                   # 文件锁(运行时)

存储层级结构

graph TD
    subgraph "存储层组件"
        FS[FileStore<br/>文件操作]
        SM[StateManager<br/>状态管理]
    end
    
    subgraph "文件结构"
        ST[state.json]
        PL[plan.md]
        CTX[context.md]
        TI[tasks-input.json]
        TK[tasks/*.json]
        AP[approvals/*.json]
        MT[meetings/*.json]
        LK[.lock]
    end
    
    FS --> ST
    FS --> PL
    FS --> CTX
    FS --> TI
    FS --> TK
    FS --> AP
    FS --> MT
    
    SM --> FS
    SM --> LK

6.2 FileStore 文件存储

FileStore 提供底层文件操作能力。

核心接口

// storage/file-store.ts

interface FileStore {
  /**
   * 读取 JSON 文件
   */
  readJson<T>(path: string): T;
  
  /**
   * 写入 JSON 文件
   */
  writeJson<T>(path: string, data: T): void;
  
  /**
   * 读取 Markdown 文件
   */
  readMd(path: string): string;
  
  /**
   * 写入 Markdown 文件
   */
  writeMd(path: string, content: string): void;
  
  /**
   * 原子追加内容
   * 确保并发安全
   */
  atomicAppend(path: string, content: string): void;
  
  /**
   * 检查文件是否存在
   */
  exists(path: string): boolean;
  
  /**
   * 删除文件
   */
  delete(path: string): void;
  
  /**
   * 列出目录内容
   */
  listDir(path: string): string[];
  
  /**
   * 创建目录
   */
  createDir(path: string): void;
  
  /**
   * 删除目录
   */
  removeDir(path: string): void;
}

JSON 文件操作

class FileStoreImpl implements FileStore {
  readJson<T>(path: string): T {
    if (!this.exists(path)) {
      throw new Error(`File not found: ${path}`);
    }
    
    const content = fs.readFileSync(path, 'utf-8');
    return JSON.parse(content) as T;
  }
  
  writeJson<T>(path: string, data: T): void {
    const content = JSON.stringify(data, null, 2);
    
    // 确保目录存在
    const dir = path.dirname(path);
    if (!this.exists(dir)) {
      this.createDir(dir);
    }
    
    fs.writeFileSync(path, content, 'utf-8');
  }
}

Markdown 文件操作

class FileStoreImpl implements FileStore {
  readMd(path: string): string {
    if (!this.exists(path)) {
      return '';  // 空文件返回空字符串
    }
    
    return fs.readFileSync(path, 'utf-8');
  }
  
  writeMd(path: string, content: string): void {
    const dir = path.dirname(path);
    if (!this.exists(dir)) {
      this.createDir(dir);
    }
    
    fs.writeFileSync(path, content, 'utf-8');
  }
  
  atomicAppend(path: string, content: string): void {
    // 使用追加模式确保原子性
    const dir = path.dirname(path);
    if (!this.exists(dir)) {
      this.createDir(dir);
    }
    
    // 添加时间戳分隔
    const timestamp = new Date().toISOString();
    const fullContent = `\n\n---\n**${timestamp}**\n${content}`;
    
    fs.appendFileSync(path, fullContent, 'utf-8');
  }
}

文件锁机制

class FileStoreImpl implements FileStore {
  private lockPath = '.openmatrix/.lock';
  private lockTimeout = 30000;  // 30秒
  
  acquireLock(): Promise<void> {
    const startTime = Date.now();
    
    while (Date.now() - startTime < this.lockTimeout) {
      try {
        // 尝试创建锁文件
        const lockContent = JSON.stringify({
          pid: process.pid,
          timestamp: Date.now()
        });
        
        fs.writeFileSync(this.lockPath, lockContent, { flag: 'wx' });
        return;  // 成功获取锁
      } catch (err) {
        if (err.code === 'EEXIST') {
          // 锁文件已存在,检查是否过期
          const lockData = this.readJson(this.lockPath);
          const lockAge = Date.now() - lockData.timestamp;
          
          if (lockAge > this.lockTimeout) {
            // 锁已过期,强制删除
            this.delete(this.lockPath);
            continue;
          }
          
          // 等待一小段时间后重试
          await new Promise(r => setTimeout(r, 100));
        } else {
          throw err;
        }
      }
    }
    
    throw new Error('Failed to acquire lock within timeout');
  }
  
  releaseLock(): void {
    if (this.exists(this.lockPath)) {
      const lockData = this.readJson(this.lockPath);
      
      // 只释放自己持有的锁
      if (lockData.pid === process.pid) {
        this.delete(this.lockPath);
      }
    }
  }
}

6.3 StateManager 状态管理

StateManager 管理全局和任务状态。

核心接口

// storage/state-manager.ts

interface StateManager {
  /**
   * 初始化存储
   */
  initialize(runId: string, input: TaskInput): void;
  
  /**
   * 获取全局状态
   */
  getState(): GlobalState;
  
  /**
   * 更新全局状态
   */
  updateState(updates: Partial<GlobalState>): void;
  
  /**
   * 获取任务
   */
  getTask(taskId: string): Task;
  
  /**
   * 获取所有任务
   */
  getAllTasks(): Task[];
  
  /**
   * 按状态获取任务
   */
  getTasksByStatus(status: TaskStatus): Task[];
  
  /**
   * 保存任务
   */
  saveTask(task: Task): void;
  
  /**
   * 更新任务状态
   */
  updateTaskStatus(taskId: string, status: TaskStatus): void;
  
  /**
   * 保存阶段结果
   */
  savePhaseResult(taskId: string, phase: PhaseType, result: any): void;
  
  /**
   * 获取 Meeting
   */
  getMeeting(meetingId: string): Meeting;
  
  /**
   * 获取待处理 Meeting
   */
  getPendingMeetings(): Meeting[];
  
  /**
   * 保存 Meeting
   */
  saveMeeting(meeting: Meeting): void;
  
  /**
   * 获取审批
   */
  getApproval(approvalId: string): Approval;
  
  /**
   * 获取待处理审批
   */
  getPendingApprovals(): Approval[];
  
  /**
   * 保存审批
   */
  saveApproval(approval: Approval): void;
  
  /**
   * 获取执行统计
   */
  getStatistics(): ExecutionStatistics;
  
  /**
   * 清理存储
   */
  cleanup(): void;
}

GlobalState 结构

interface GlobalState {
  // 运行标识
  runId: string;
  status: ExecutionStatus;
  
  // 配置
  config: {
    quality: QualityLevel;
    mode: ExecutionMode;
    e2eTests: boolean;
    bypassApprovals: boolean;
  };
  
  // 统计
  statistics: {
    totalTasks: number;
    completedTasks: number;
    failedTasks: number;
    blockedTasks: number;
    pendingMeetings: number;
    pendingApprovals: number;
  };
  
  // 当前任务
  currentTask: string | null;
  
  // 时间信息
  createdAt: string;
  updatedAt: string;
  completedAt?: string;
}

状态初始化

class StateManagerImpl implements StateManager {
  initialize(runId: string, input: TaskInput): void {
    const basePath = '.openmatrix';
    
    // 创建目录结构
    if (!this.fileStore.exists(basePath)) {
      this.fileStore.createDir(basePath);
      this.fileStore.createDir(`${basePath}/tasks`);
      this.fileStore.createDir(`${basePath}/approvals`);
      this.fileStore.createDir(`${basePath}/meetings`);
    }
    
    // 初始化全局状态
    const state: GlobalState = {
      runId,
      status: 'running',
      config: {
        quality: 'balanced',
        mode: 'semi-auto',
        e2eTests: false,
        bypassApprovals: false
      },
      statistics: {
        totalTasks: 0,
        completedTasks: 0,
        failedTasks: 0,
        blockedTasks: 0,
        pendingMeetings: 0,
        pendingApprovals: 0
      },
      currentTask: null,
      createdAt: new Date().toISOString(),
      updatedAt: new Date().toISOString()
    };
    
    this.fileStore.writeJson(`${basePath}/state.json`, state);
    
    // 保存任务输入
    this.fileStore.writeJson(`${basePath}/tasks-input.json`, input);
    
    // 初始化上下文文件
    this.fileStore.writeMd(`${basePath}/context.md`, '# Agent Memory\n\n');
    this.fileStore.writeMd(`${basePath}/plan.md`, '# Execution Plan\n\n');
  }
}

任务状态管理

class StateManagerImpl implements StateManager {
  saveTask(task: Task): void {
    const basePath = `.openmatrix/tasks/${task.id}`;
    
    // 创建任务目录
    if (!this.fileStore.exists(basePath)) {
      this.fileStore.createDir(basePath);
      this.fileStore.createDir(`${basePath}/artifacts`);
    }
    
    // 保存任务定义
    this.fileStore.writeJson(`${basePath}/task.json`, task);
    
    // 更新统计
    this.updateStatistics();
  }
  
  updateTaskStatus(taskId: string, status: TaskStatus): void {
    const task = this.getTask(taskId);
    task.status = status;
    task.updatedAt = new Date().toISOString();
    
    // 记录状态历史
    task.statusHistory.push({
      from: task.status,
      to: status,
      timestamp: new Date().toISOString()
    });
    
    this.saveTask(task);
    
    // 更新全局状态
    this.updateState({
      currentTask: status === 'in_progress' ? taskId : null,
      updatedAt: new Date().toISOString()
    });
    
    // 更新统计
    this.updateStatistics();
  }
  
  updateStatistics(): void {
    const tasks = this.getAllTasks();
    const meetings = this.getPendingMeetings();
    const approvals = this.getPendingApprovals();
    
    this.updateState({
      statistics: {
        totalTasks: tasks.length,
        completedTasks: tasks.filter(t => t.status === 'completed').length,
        failedTasks: tasks.filter(t => t.status === 'failed').length,
        blockedTasks: tasks.filter(t => t.status === 'waiting').length,
        pendingMeetings: meetings.length,
        pendingApprovals: approvals.length
      }
    });
  }
}

阶段结果保存

class StateManagerImpl implements StateManager {
  savePhaseResult(taskId: string, phase: PhaseType, result: any): void {
    const basePath = `.openmatrix/tasks/${taskId}`;
    
    // 保存阶段结果文件
    this.fileStore.writeJson(`${basePath}/${phase}.json`, result);
    
    // 更新任务的状态
    const task = this.getTask(taskId);
    task.phases.push({
      type: phase,
      status: result.status,
      completedAt: new Date().toISOString()
    });
    
    this.saveTask(task);
  }
}

Meeting 管理

class StateManagerImpl implements StateManager {
  getPendingMeetings(): Meeting[] {
    const basePath = '.openmatrix/meetings';
    
    if (!this.fileStore.exists(basePath)) {
      return [];
    }
    
    const meetingDirs = this.fileStore.listDir(basePath);
    const meetings: Meeting[] = [];
    
    for (const dir of meetingDirs) {
      const meetingPath = `${basePath}/${dir}/meeting.json`;
      if (this.fileStore.exists(meetingPath)) {
        const meeting = this.fileStore.readJson<Meeting>(meetingPath);
        if (meeting.status === 'pending') {
          meetings.push(meeting);
        }
      }
    }
    
    return meetings;
  }
  
  saveMeeting(meeting: Meeting): void {
    const basePath = `.openmatrix/meetings/${meeting.id}`;
    
    if (!this.fileStore.exists(basePath)) {
      this.fileStore.createDir(basePath);
    }
    
    this.fileStore.writeJson(`${basePath}/meeting.json`, meeting);
    
    // 如果有上下文,保存上下文
    if (meeting.context) {
      this.fileStore.writeMd(`${basePath}/context.md`, meeting.context);
    }
    
    this.updateStatistics();
  }
}

审批管理

class StateManagerImpl implements StateManager {
  getPendingApprovals(): Approval[] {
    const basePath = '.openmatrix/approvals';
    
    if (!this.fileStore.exists(basePath)) {
      return [];
    }
    
    const approvalDirs = this.fileStore.listDir(basePath);
    const approvals: Approval[] = [];
    
    for (const dir of approvalDirs) {
      const approvalPath = `${basePath}/${dir}/approval.json`;
      if (this.fileStore.exists(approvalPath)) {
        const approval = this.fileStore.readJson<Approval>(approvalPath);
        if (approval.status === 'pending') {
          approvals.push(approval);
        }
      }
    }
    
    return approvals;
  }
  
  saveApproval(approval: Approval): void {
    const basePath = `.openmatrix/approvals/${approval.id}`;
    
    if (!this.fileStore.exists(basePath)) {
      this.fileStore.createDir(basePath);
    }
    
    this.fileStore.writeJson(`${basePath}/approval.json`, approval);
    
    this.updateStatistics();
  }
}

清理存储

class StateManagerImpl implements StateManager {
  cleanup(): void {
    const basePath = '.openmatrix';
    
    // 删除锁文件
    if (this.fileStore.exists(`${basePath}/.lock`)) {
      this.fileStore.delete(`${basePath}/.lock`);
    }
    
    // 可选:清理临时文件
    // 注意:不删除实际数据,只清理运行时临时文件
  }
}

6.4 数据类型定义

Task 类型

interface Task {
  id: string;                    // TASK-001
  title: string;                 // 任务标题
  description: string;           // 详细描述
  status: TaskStatus;            // 当前状态
  
  // Agent 相关
  agentType: AgentType;          // 负责的 Agent
  phases: PhaseRecord[];         // 阶段执行记录
  
  // 依赖关系
  dependencies: string[];        // 依赖的任务 ID
  dependents: string[];          // 被依赖的任务 ID
  
  // 优先级和复杂度
  priority: number;              // 优先级 (1-10)
  estimatedComplexity: string;   // 预估复杂度
  
  // 输出
  artifacts: string[];           // 输出文件路径
  
  // 时间信息
  createdAt: string;
  updatedAt: string;
  completedAt?: string;
  
  // 状态历史
  statusHistory: StatusTransition[];
  
  // 重试信息
  retryCount?: number;
  lastError?: ErrorInfo;
}

interface PhaseRecord {
  type: PhaseType;
  status: 'pending' | 'running' | 'success' | 'failed';
  startedAt?: string;
  completedAt?: string;
  error?: ErrorInfo;
}

interface StatusTransition {
  from: TaskStatus;
  to: TaskStatus;
  timestamp: string;
  reason?: string;
}

Meeting 类型

interface Meeting {
  id: string;                    // MEETING-001
  taskId: string;                // 关联任务
  type: MeetingType;             // 类型
  
  title: string;                 // 标题
  description: string;           // 详细描述
  context?: string;              // 阻塞上下文
  
  questions: Question[];         // 需要回答的问题
  
  status: 'pending' | 'resolved' | 'skipped';
  
  createdAt: string;
  resolvedAt?: string;
  
  resolution?: Resolution;
}

type MeetingType = 
  | 'information'    // 缺少信息
  | 'decision'       // 需要决策
  | 'approval'       // 需要审批
  | 'dependency';    // 依赖问题

interface Question {
  id: string;
  text: string;
  type: 'text' | 'choice' | 'boolean';
  options?: string[];
  required: boolean;
  default?: string;
}

interface Resolution {
  answers: Map<string, string>;
  decision?: 'skip' | 'retry';
  notes?: string;
}

Approval 类型

interface Approval {
  id: string;                    // APPROVAL-001
  type: ApprovalType;            // 审批类型
  
  content: ApprovalContent;      // 审批内容
  context?: string;              // 相关上下文
  
  status: 'pending' | 'approved' | 'rejected';
  
  createdAt: string;
  processedAt?: string;
  
  decision?: 'approve' | 'reject';
  feedback?: string;
}

type ApprovalType = 
  | 'plan'         // 计划审批
  | 'merge'        // 合并审批
  | 'deploy'       // 部署审批
  | 'meeting';     // Meeting 决策

interface ApprovalContent {
  // plan 类型
  tasks?: Task[];
  planSummary?: string;
  
  // merge 类型
  files?: string[];
  commitMessage?: string;
  
  // deploy 类型
  environment?: string;
  config?: any;
  
  // meeting 类型
  meeting?: Meeting;
}

统计类型

interface ExecutionStatistics {
  totalTasks: number;
  completedTasks: number;
  failedTasks: number;
  blockedTasks: number;
  
  // 阶段统计
  phases: {
    tdd: { completed: number; failed: number };
    develop: { completed: number; failed: number };
    verify: { completed: number; failed: number };
    accept: { completed: number; failed: number };
  };
  
  // 时间统计
  timing: {
    startedAt: string;
    estimatedCompletion?: string;
    actualCompletion?: string;
    totalDurationMs?: number;
  };
  
  // 质量统计
  quality: {
    averageCoverage?: number;
    averageReviewScore?: number;
    totalLintErrors?: number;
    securityIssues?: number;
  };
}

6.5 数据一致性保障

写前读验证

class StateManagerImpl {
  updateTask(taskId: string, updates: Partial<Task>): void {
    // 获取锁
    this.fileStore.acquireLock();
    
    try {
      // 读取当前状态
      const currentTask = this.getTask(taskId);
      
      // 验证状态一致性
      if (currentTask.updatedAt !== updates.updatedAt) {
        throw new Error('Task state conflict - concurrent modification');
      }
      
      // 应用更新
      const updatedTask = { ...currentTask, ...updates };
      updatedTask.updatedAt = new Date().toISOString();
      
      // 保存
      this.saveTask(updatedTask);
    } finally {
      // 释放锁
      this.fileStore.releaseLock();
    }
  }
}

状态版本控制

interface VersionedState {
  version: number;
  state: GlobalState;
  checksum: string;
}

class StateManagerImpl {
  updateState(updates: Partial<GlobalState>): void {
    this.fileStore.acquireLock();
    
    try {
      const current = this.getStateWithVersion();
      
      // 验证版本
      const newVersion = current.version + 1;
      
      // 创建新状态
      const newState = {
        version: newVersion,
        state: { ...current.state, ...updates },
        checksum: this.calculateChecksum({ ...current.state, ...updates })
      };
      
      // 保存
      this.fileStore.writeJson('.openmatrix/state.json', newState);
    } finally {
      this.fileStore.releaseLock();
    }
  }
  
  calculateChecksum(data: any): string {
    const content = JSON.stringify(data);
    return crypto.createHash('md5').update(content).digest('hex');
  }
}

并发写入处理

flowchart TD
    A[并发写入请求] --> B[获取文件锁]
    B --> C{锁获取成功?}
    
    C --> |"成功"| D[读取当前状态]
    C --> |"失败"| E[等待重试]
    
    D --> F[验证版本一致性]
    F --> G{版本一致?}
    
    G --> |"一致"| H[应用更新]
    G --> |"不一致"| I[合并变更<br/>或报错]
    
    H --> J[写入新状态]
    J --> K[释放锁]
    
    E --> B
    I --> J
    
    K --> L[完成]

6.6 数据恢复机制

状态恢复

class StateManagerImpl {
  recoverState(): GlobalState | null {
    const statePath = '.openmatrix/state.json';
    
    if (!this.fileStore.exists(statePath)) {
      return null;  // 无历史状态
    }
    
    try {
      const versioned = this.fileStore.readJson<VersionedState>(statePath);
      
      // 验证 checksum
      const expectedChecksum = this.calculateChecksum(versioned.state);
      if (versioned.checksum !== expectedChecksum) {
        // 状态文件损坏,尝试恢复
        return this.recoverFromBackup();
      }
      
      return versioned.state;
    } catch (err) {
      // JSON 解析失败,文件损坏
      return this.recoverFromBackup();
    }
  }
  
  recoverFromBackup(): GlobalState | null {
    const backupPath = '.openmatrix/state.json.backup';
    
    if (this.fileStore.exists(backupPath)) {
      try {
        return this.fileStore.readJson<GlobalState>(backupPath);
      } catch {
        return null;
      }
    }
    
    return null;
  }
}

任务恢复

class StateManagerImpl {
  recoverTasks(): Task[] {
    const basePath = '.openmatrix/tasks';
    
    if (!this.fileStore.exists(basePath)) {
      return [];
    }
    
    const tasks: Task[] = [];
    const taskDirs = this.fileStore.listDir(basePath);
    
    for (const dir of taskDirs) {
      const taskPath = `${basePath}/${dir}/task.json`;
      
      if (this.fileStore.exists(taskPath)) {
        try {
          const task = this.fileStore.readJson<Task>(taskPath);
          tasks.push(task);
        } catch {
          // 单个任务文件损坏,跳过
          console.warn(`Corrupted task file: ${taskPath}`);
        }
      }
    }
    
    return tasks;
  }
}

6.7 存储监控

存储健康检查

interface StorageHealth {
  status: 'healthy' | 'warning' | 'critical';
  checks: {
    stateFile: 'ok' | 'missing' | 'corrupted';
    tasksDir: 'ok' | 'missing';
    meetingsDir: 'ok' | 'missing';
    approvalsDir: 'ok' | 'missing';
  };
  issues: string[];
  lastChecked: string;
}

class StateManagerImpl {
  checkHealth(): StorageHealth {
    const health: StorageHealth = {
      status: 'healthy',
      checks: {
        stateFile: 'ok',
        tasksDir: 'ok',
        meetingsDir: 'ok',
        approvalsDir: 'ok'
      },
      issues: [],
      lastChecked: new Date().toISOString()
    };
    
    // 检查 state.json
    const statePath = '.openmatrix/state.json';
    if (!this.fileStore.exists(statePath)) {
      health.checks.stateFile = 'missing';
      health.issues.push('state.json is missing');
      health.status = 'critical';
    } else {
      try {
        this.fileStore.readJson(statePath);
      } catch {
        health.checks.stateFile = 'corrupted';
        health.issues.push('state.json is corrupted');
        health.status = 'critical';
      }
    }
    
    // 检查目录
    const dirs = ['tasks', 'meetings', 'approvals'];
    for (const dir of dirs) {
      const dirPath = `.openmatrix/${dir}`;
      if (!this.fileStore.exists(dirPath)) {
        health.checks[`dir`] = 'missing';
        health.issues.push(`${dir} directory is missing`);
        health.status = 'warning';
      }
    }
    
    return health;
  }
}

存储大小监控

class StateManagerImpl {
  getStorageSize(): StorageSizeInfo {
    const basePath = '.openmatrix';
    
    const sizeInfo = {
      totalSizeBytes: 0,
      files: {
        state: 0,
        tasks: 0,
        meetings: 0,
        approvals: 0,
        context: 0,
        plan: 0
      }
    };
    
    // 计算各部分大小
    const statePath = `${basePath}/state.json`;
    if (this.fileStore.exists(statePath)) {
      sizeInfo.files.state = fs.statSync(statePath).size;
    }
    
    const tasksDir = `${basePath}/tasks`;
    if (this.fileStore.exists(tasksDir)) {
      sizeInfo.files.tasks = this.calculateDirSize(tasksDir);
    }
    
    // ... 其他部分
    
    sizeInfo.totalSizeBytes = Object.values(sizeInfo.files)
      .reduce((a, b) => a + b, 0);
    
    return sizeInfo;
  }
}

下一章将详细讲解 CLI 命令系统