Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第五章:Harness 架构设计

将方法论落地为可运行的系统架构。

架构设计原则

FIRST 原则(适配版)

传统测试的 FIRST 原则需要适配 AI 语境:

原则传统含义AI适配含义
Fast快速执行评估效率优化(批量、缓存)
Independent测试独立评估案例独立,无依赖
Repeatable结果可复现控制温度,记录完整上下文
Self-validating自动判断自动评分,阈值判断
Timely及时编写Prompt迭代同步测试更新

AI FIRST 补充

  • Deterministic as possible: 尽量确定化(temperature=0)
  • Traceable: 全链路可追溯
  • Comparable: 跨版本可对比

可扩展性设计

graph TB
    A[可扩展性需求] --> B[评估器可插拔]
    A --> C[数据源多样化]
    A --> D[模型后端多样]
    A --> E[报告定制化]
    
    B --> B1[插件化架构]
    C --> C1[适配器模式]
    D --> D1[统一接口]
    E --> E1[模板化报告]

插件化设计示例:

# 评估器插件接口
class EvaluatorPlugin(ABC):
    """评估器插件基类"""
    
    @abstractmethod
    def name(self) -> str:
        """插件名称"""
        pass
    
    @abstractmethod
    def evaluate(self, output: str, reference: str = None) -> float:
        """评估方法,返回0-1分数"""
        pass
    
    @abstractmethod
    def config_schema(self) -> dict:
        """配置Schema"""
        pass

# 具体插件实现
class SemanticSimilarityEvaluator(EvaluatorPlugin):
    def name(self) -> str:
        return "semantic_similarity"
    
    def evaluate(self, output: str, reference: str = None) -> float:
        if not reference:
            return 0.0
        # 使用embedding计算相似度
        return cosine_similarity(
            get_embedding(output),
            get_embedding(reference)
        )
    
    def config_schema(self) -> dict:
        return {
            "model": {"type": "string", "default": "text-embedding-3-small"},
            "threshold": {"type": "float", "default": 0.8}
        }

整体架构蓝图

核心架构图

graph TB
    subgraph "数据层"
        A1[Golden Set Store]
        A2[Result Store]
        A3[Version Registry]
    end
    
    subgraph "执行层"
        B1[Test Runner]
        B2[Model Caller]
        B3[Evaluator Engine]
    end
    
    subgraph "分析层"
        C1[Score Aggregator]
        C2[Comparator]
        C3[Reporter]
    end
    
    subgraph "接口层"
        D1[CLI]
        D2[API]
        D3[Web UI]
    end
    
    D1 --> B1
    D2 --> B1
    D3 --> C3
    
    A1 --> B1
    B1 --> B2 --> B3 --> C1
    C1 --> C2 --> C3
    C1 --> A2
    A3 --> A1
    A3 --> A2

分层职责

层级职责关键组件
接口层用户交互CLI、API、WebUI
执行层评估执行Runner、Caller、Evaluator
分析层结果处理Aggregator、Comparator、Reporter
数据层数据存储Golden Set、Result Store、Version

核心组件设计

Test Runner(执行引擎)

graph TB
    subgraph TestRunner["Test Runner"]
        A[load_cases]
        B[execute]
        C[parallel]
        D[retry]
    end
    
    subgraph ExecutionPipeline["Execution Pipeline"]
        E[Case -> Model]
        F[-> Evaluator]
        G[-> Result]
    end
    
    A --> E
    B --> E

核心功能:

功能说明设计要点
数据加载加载测试案例支持多种格式(JSON/YAML/CSV)
并行执行批量并行处理控制并发,避免限流
失败重试网络失败重试记录重试次数
进度跟踪执行进度报告实时状态更新
# Test Runner 核心逻辑
class TestRunner:
    def __init__(self, config: RunnerConfig):
        self.config = config
        self.model_caller = ModelCaller(config.model)
        self.evaluator = EvaluatorEngine(config.evaluators)
    
    async def execute(self, test_cases: List[TestCase]) -> List[Result]:
        results = []
        
        # 并行执行(带进度跟踪)
        with ProgressTracker(len(test_cases)) as progress:
            for batch in chunk(test_cases, self.config.batch_size):
                batch_results = await asyncio.gather(
                    *[self._execute_single(case) for case in batch]
                )
                results.extend(batch_results)
                progress.update(len(batch))
        
        return results
    
    async def _execute_single(self, case: TestCase) -> Result:
        # 单个案例执行流程
        try:
            response = await self.model_caller.call(case.input)
            scores = await self.evaluator.evaluate(response, case.reference)
            return Result(case_id=case.id, response=response, scores=scores)
        except Exception as e:
            return Result(case_id=case.id, error=str(e))

Evaluator Engine(评估引擎)

graph TB
    A[评估请求] --> B[Evaluator Engine]
    
    B --> C1[Semantic Evaluator]
    B --> C2[Exact Match Evaluator]
    B --> C3[G-Eval Evaluator]
    B --> C4[Rule Evaluator]
    
    C1 --> D[Score Aggregator]
    C2 --> D
    C3 --> D
    C4 --> D
    
    D --> E[综合得分]

评估器类型:

类型原理适用场景优点缺点
Exact Match精确字符串匹配分类、抽取简单可靠不适合生成类
Semantic SimEmbedding相似度生成类语义理解需embedding模型
G-EvalGPT-4评分开放生成灵活全面成本高
Rule-based规则检测安全、格式确定可靠需预设规则
Task-based任务完成验证Agent端到端验证执行复杂
# G-Eval 实现示例
class GEvalEvaluator(EvaluatorPlugin):
    """
    使用GPT-4进行评估
    参考: https://arxiv.org/abs/2303.16634
    """
    
    PROMPT_TEMPLATE = """
    请评估以下AI输出的质量。
    
    输入: {input}
    输出: {output}
    
    评估维度:
    1. Relevance (相关性): 是否回答了问题 (1-10)
    2. Accuracy (准确性): 内容是否正确 (1-10)
    3. Coherence (连贯性): 逻辑是否通顺 (1-10)
    4. Fluency (流畅性): 语言是否自然 (1-10)
    
    请以JSON格式返回评分:
    {"relevance": X, "accuracy": X, "coherence": X, "fluency": X}
    """
    
    def evaluate(self, output: str, input: str = None, reference: str = None) -> dict:
        prompt = self.PROMPT_TEMPLATE.format(input=input, output=output)
        response = call_gpt4(prompt)
        scores = parse_json(response)
        
        # 计算加权平均
        weights = {"relevance": 0.3, "accuracy": 0.3, "coherence": 0.2, "fluency": 0.2}
        overall = sum(scores[k] * weights[k] for k in scores) / 10
        
        return {"overall": overall, "details": scores}

Model Caller(模型调用器)

统一模型调用接口,支持多后端:

graph TB
    A[Model Caller] --> B[OpenAI API]
    A --> C[Local Model]
    
    A --> D[call]
    A --> E[batch_call]
    A --> F[cache]
# 模型调用统一接口
class ModelCaller(ABC):
    @abstractmethod
    async def call(self, prompt: str, **kwargs) -> str:
        pass
    
    @abstractmethod
    async def batch_call(self, prompts: List[str], **kwargs) -> List[str]:
        pass

# OpenAI实现
class OpenAICaller(ModelCaller):
    def __init__(self, model: str, api_key: str):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.cache = ResponseCache()  # 缓存相同请求
    
    async def call(self, prompt: str, **kwargs) -> str:
        # 检查缓存
        cache_key = hash(prompt + str(kwargs))
        if cached := self.cache.get(cache_key):
            return cached
        
        # 调用API
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            **kwargs
        )
        
        result = response.choices[0].message.content
        self.cache.set(cache_key, result)
        return result

Result Store(结果存储)

graph TB
    A[评估结果] --> B[Result Store]
    
    B --> C1[JSON文件]
    B --> C2[数据库]
    B --> C3[时序存储]
    
    C1 --> D1[简单查询]
    C2 --> D2[复杂分析]
    C3 --> D3[趋势分析]

存储Schema:

{
  "eval_id": "eval_2024_01_15_001",
  "timestamp": "2024-01-15T10:30:00Z",
  "config": {
    "dataset": "golden_set_v1.2",
    "model": "gpt-4-turbo",
    "evaluators": ["semantic_sim", "g_eval"]
  },
  "results": [
    {
      "case_id": "case_001",
      "input": "用户问题...",
      "output": "模型输出...",
      "scores": {
        "semantic_sim": 0.85,
        "g_eval": {"overall": 0.82, "details": {...}}
      },
      "latency_ms": 1200
    }
  ],
  "summary": {
    "total": 100,
    "pass": 85,
    "fail": 15,
    "avg_score": 0.83,
    "std_score": 0.12
  }
}

流程编排设计

评估流水线

graph LR
    A[配置加载] --> B[数据加载]
    B --> C[执行评估]
    C --> D[结果存储]
    D --> E[分析报告]
    E --> F[可视化]
    
    subgraph "配置阶段"
        A --> A1[评估器配置]
        A --> A2[模型配置]
        A --> A3[数据集版本]
    end
    
    subgraph "执行阶段"
        C --> C1[并行调用]
        C --> C2[评分计算]
        C --> C3[失败处理]
    end

配置驱动设计

# eval_config.yaml
evaluation:
  name: "customer_service_eval"
  version: "1.0"
  
  dataset:
    name: "golden_set_v1.2"
    path: "datasets/golden_set_v1.2.json"
    checksum: "abc123..."
    
  model:
    backend: "openai"
    name: "gpt-4-turbo"
    temperature: 0.3
    max_tokens: 500
    
  evaluators:
    - name: "semantic_similarity"
      weight: 0.4
      threshold: 0.8
      config:
        embedding_model: "text-embedding-3"
        
    - name: "g_eval"
      weight: 0.4
      threshold: 0.7
      config:
        criteria: ["relevance", "accuracy", "coherence"]
        
    - name: "safety_check"
      weight: 0.2
      threshold: 1.0
      config:
        rules: ["no_harmful", "no_pii"]
    
  execution:
    parallel: true
    batch_size: 10
    retry_count: 3
    timeout_sec: 30
    
  output:
    path: "results/"
    format: "json"
    include_raw: true

监控与告警集成

实时监控架构

graph LR
    A[评估执行] --> B[指标收集]
    B --> C[监控平台]
    
    A --> A1[得分记录]
    B --> B1[统计计算]
    C --> C1[告警触发]

监控指标:

指标类别具体指标告警阈值
质量平均得分、通过率低于基线10%
稳定性得分方差、失败率方差>0.15
性能响应时间、Token消耗超时率>5%
成本API调用费用超预算

告警设计原则

  1. 分级告警:P0(立即处理) / P1(当日处理) / P2(周内关注)
  2. 告警收敛:避免告警风暴,智能聚合
  3. 上下文完整:告警附带诊断信息
  4. 自动响应:可自动触发降级策略

实战:完整 Harness 系统实现

项目结构

ai_harness/
├── config/
│   ├── eval_config.yaml          # 评估配置
│   ├── model_config.yaml         # 模型配置
│   └── monitoring_config.yaml    # 监控配置
├── datasets/
│   ├── golden_set_v1.0.yaml      # Golden Set
│   ├── boundary_set_v1.0.yaml    # Boundary Set
│   └── adversarial_set_v1.0.yaml # Adversarial Set
├── src/
│   ├── core/
│   │   ├── runner.py             # Test Runner
│   │   ├── evaluator.py          # Evaluator Engine
│   │   ├── caller.py             # Model Caller
│   │   └── result_store.py       # Result Store
│   ├── evaluators/
│   │   ├── semantic_sim.py       # 语义相似度
│   │   ├── g_eval.py             # G-Eval
│   │   ├── rule_checker.py       # 规则检查
│   │   └── task_checker.py       # 任务完成度
│   ├── monitoring/
│   │   ├── collector.py          # 指标采集
│   │   ├── aggregator.py         # 指标聚合
│   │   ├── alert_manager.py      # 告警管理
│   │   └── dashboard.py          # 仪表板
│   └── utils/
│       ├── cache.py              # 缓存
│       ├── logger.py             # 日志
│       └── config_loader.py      # 配置加载
├── tests/
│   ├── test_runner.py
│   ├── test_evaluators.py
│   └── test_integration.py
├── reports/
│   └── templates/
│       ├── summary.html
│       └── detail.html
├── cli.py                        # 命令行入口
└── api.py                        # REST API入口

Test Runner 完整实现

# src/core/runner.py
import asyncio
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
import yaml
import json
from pathlib import Path

@dataclass
class RunnerConfig:
    """Runner 配置"""
    batch_size: int = 10
    max_concurrent: int = 5
    timeout_seconds: int = 30
    retry_count: int = 3
    cache_enabled: bool = True
    save_intermediate: bool = True

@dataclass
class TestCase:
    """测试案例"""
    id: str
    input: str
    reference: Optional[str] = None
    category: Optional[str] = None
    metadata: Dict = field(default_factory=dict)

@dataclass
class ExecutionResult:
    """执行结果"""
    case_id: str
    output: str
    scores: Dict[str, float]
    overall_score: float
    passed: bool
    latency_ms: float
    retry_count: int = 0
    error: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)

class TestRunner:
    """
    测试执行引擎
    
    核心职责:
    1. 加载测试案例
    2. 并行执行模型调用
    3. 调用评估器评分
    4. 聚合和存储结果
    """
    
    def __init__(
        self,
        config: RunnerConfig,
        model_caller,
        evaluator_engine,
        result_store
    ):
        self.config = config
        self.model_caller = model_caller
        self.evaluator = evaluator_engine
        self.store = result_store
        self.cache = {} if config.cache_enabled else None
        
    async def run(
        self,
        dataset_path: str,
        eval_config: Dict
    ) -> Dict:
        """
        执行完整评估流程
        """
        # 1. 加载测试数据
        test_cases = self._load_dataset(dataset_path)
        
        # 2. 执行评估
        results = await self._execute_batch(test_cases, eval_config)
        
        # 3. 聚合结果
        summary = self._aggregate_results(results)
        
        # 4. 存储结果
        eval_id = self._generate_eval_id()
        self.store.save(eval_id, results, summary)
        
        # 5. 生成报告
        report = self._generate_report(eval_id, results, summary)
        
        return {
            "eval_id": eval_id,
            "results": results,
            "summary": summary,
            "report": report,
        }
    
    def _load_dataset(self, path: str) -> List[TestCase]:
        """加载测试数据集"""
        with open(path, 'r', encoding='utf-8') as f:
            data = yaml.safe_load(f)
        
        cases = []
        for case_data in data.get('cases', []):
            cases.append(TestCase(
                id=case_data['id'],
                input=case_data['input'],
                reference=case_data.get('reference'),
                category=case_data.get('category'),
                metadata=case_data.get('metadata', {}),
            ))
        
        return cases
    
    async def _execute_batch(
        self,
        cases: List[TestCase],
        eval_config: Dict
    ) -> List[ExecutionResult]:
        """批量执行评估"""
        results = []
        
        # 分批并行执行
        for batch in self._chunk(cases, self.config.batch_size):
            batch_results = await asyncio.gather(
                *[self._execute_single(case, eval_config) for case in batch],
                return_exceptions=True
            )
            
            for result in batch_results:
                if isinstance(result, Exception):
                    # 异常处理
                    results.append(ExecutionResult(
                        case_id="unknown",
                        output="",
                        scores={},
                        overall_score=0.0,
                        passed=False,
                        latency_ms=0,
                        error=str(result),
                    ))
                else:
                    results.append(result)
        
        return results
    
    async def _execute_single(
        self,
        case: TestCase,
        eval_config: Dict
    ) -> ExecutionResult:
        """执行单个案例"""
        start_time = datetime.now()
        retry_count = 0
        error = None
        
        # 检查缓存
        cache_key = f"{case.id}:{case.input}"
        if self.cache and cache_key in self.cache:
            cached = self.cache[cache_key]
            return ExecutionResult(
                case_id=case.id,
                output=cached['output'],
                scores=cached['scores'],
                overall_score=cached['overall_score'],
                passed=cached['passed'],
                latency_ms=0,
                retry_count=0,
            )
        
        # 模型调用(带重试)
        output = None
        for attempt in range(self.config.retry_count):
            try:
                output = await asyncio.wait_for(
                    self.model_caller.call(case.input),
                    timeout=self.config.timeout_seconds
                )
                break
            except asyncio.TimeoutError:
                retry_count += 1
                error = "timeout"
            except Exception as e:
                retry_count += 1
                error = str(e)
        
        if output is None:
            return ExecutionResult(
                case_id=case.id,
                output="",
                scores={},
                overall_score=0.0,
                passed=False,
                latency_ms=(datetime.now() - start_time).total_seconds() * 1000,
                retry_count=retry_count,
                error=error,
            )
        
        # 评估评分
        scores = await self.evaluator.evaluate(
            output=output,
            reference=case.reference,
            input=case.input,
            config=eval_config,
        )
        
        # 计算综合得分
        overall = self._compute_overall(scores, eval_config.get('weights', {}))
        threshold = eval_config.get('threshold', 0.75)
        passed = overall >= threshold
        
        # 缓存结果
        if self.cache:
            self.cache[cache_key] = {
                'output': output,
                'scores': scores,
                'overall_score': overall,
                'passed': passed,
            }
        
        return ExecutionResult(
            case_id=case.id,
            output=output,
            scores=scores,
            overall_score=overall,
            passed=passed,
            latency_ms=(datetime.now() - start_time).total_seconds() * 1000,
            retry_count=retry_count,
        )
    
    def _chunk(self, items: List, size: int) -> List[List]:
        """分块"""
        return [items[i:i+size] for i in range(0, len(items), size)]
    
    def _compute_overall(self, scores: Dict, weights: Dict) -> float:
        """计算综合得分"""
        if not weights:
            return sum(scores.values()) / len(scores) if scores else 0.0
        
        total = 0.0
        total_weight = 0.0
        for key, weight in weights.items():
            if key in scores:
                total += scores[key] * weight
                total_weight += weight
        
        return total / total_weight if total_weight > 0 else 0.0
    
    def _aggregate_results(self, results: List[ExecutionResult]) -> Dict:
        """聚合结果统计"""
        total = len(results)
        passed = sum(1 for r in results if r.passed)
        
        scores = [r.overall_score for r in results if r.overall_score > 0]
        latencies = [r.latency_ms for r in results]
        
        return {
            "total": total,
            "passed": passed,
            "pass_rate": passed / total if total > 0 else 0,
            "avg_score": sum(scores) / len(scores) if scores else 0,
            "min_score": min(scores) if scores else 0,
            "max_score": max(scores) if scores else 0,
            "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
            "p95_latency_ms": self._percentile(latencies, 95),
            "error_count": sum(1 for r in results if r.error),
        }
    
    def _percentile(self, values: List, p: int) -> float:
        """计算百分位数"""
        if not values:
            return 0
        sorted_vals = sorted(values)
        idx = int(len(sorted_vals) * p / 100)
        return sorted_vals[min(idx, len(sorted_vals) - 1)]
    
    def _generate_eval_id(self) -> str:
        """生成评估ID"""
        return f"eval_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    def _generate_report(self, eval_id: str, results: List, summary: Dict) -> Dict:
        """生成报告"""
        return {
            "eval_id": eval_id,
            "summary": summary,
            "failed_cases": [
                {"case_id": r.case_id, "score": r.overall_score, "error": r.error}
                for r in results if not r.passed
            ],
            "top_cases": sorted(
                [{"case_id": r.case_id, "score": r.overall_score} for r in results],
                key=lambda x: x["score"],
                reverse=True
            )[:5],
            "timestamp": datetime.now(),
        }

Evaluator Engine 实现

# src/core/evaluator.py
from typing import Dict, List, Optional
from abc import ABC, abstractmethod
import asyncio

class EvaluatorPlugin(ABC):
    """评估器插件基类"""
    
    @abstractmethod
    def name(self) -> str:
        """插件名称"""
        pass
    
    @abstractmethod
    async def evaluate(
        self,
        output: str,
        reference: Optional[str] = None,
        input: Optional[str] = None,
        **kwargs
    ) -> float:
        """评估方法,返回0-1分数"""
        pass
    
    @classmethod
    def from_config(cls, config: Dict) -> 'EvaluatorPlugin':
        """从配置创建实例"""
        return cls(**config)

class EvaluatorEngine:
    """
    评估引擎
    支持多种评估器组合
    """
    
    def __init__(self, evaluators: List[EvaluatorPlugin]):
        self.evaluators = {e.name(): e for e in evaluators}
    
    async def evaluate(
        self,
        output: str,
        reference: Optional[str] = None,
        input: Optional[str] = None,
        config: Dict = None
    ) -> Dict[str, float]:
        """
        执行多评估器评估
        """
        config = config or {}
        evaluator_names = config.get('evaluators', list(self.evaluators.keys()))
        
        results = {}
        
        for name in evaluator_names:
            if name not in self.evaluators:
                continue
            
            evaluator = self.evaluators[name]
            evaluator_config = config.get('evaluator_config', {}).get(name, {})
            
            try:
                score = await evaluator.evaluate(
                    output=output,
                    reference=reference,
                    input=input,
                    **evaluator_config
                )
                results[name] = score
            except Exception as e:
                results[name] = 0.0
                results[f"{name}_error"] = str(e)
        
        return results

# 具体评估器实现

class SemanticSimilarityEvaluator(EvaluatorPlugin):
    """语义相似度评估器"""
    
    def __init__(self, model: str = "text-embedding-3-small", threshold: float = 0.8):
        self.model = model
        self.threshold = threshold
        # 初始化embedding客户端
        from openai import OpenAI
        self.client = OpenAI()
    
    def name(self) -> str:
        return "semantic_similarity"
    
    async def evaluate(
        self,
        output: str,
        reference: Optional[str] = None,
        **kwargs
    ) -> float:
        if not reference:
            return 0.0
        
        # 获取embeddings
        output_emb = self.client.embeddings.create(
            input=output, model=self.model
        ).data[0].embedding
        
        ref_emb = self.client.embeddings.create(
            input=reference, model=self.model
        ).data[0].embedding
        
        # 计算cosine相似度
        similarity = self._cosine_similarity(output_emb, ref_emb)
        
        return similarity
    
    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """计算余弦相似度"""
        import math
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = math.sqrt(sum(x*x for x in a))
        norm_b = math.sqrt(sum(y*y for y in b))
        return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0

class RuleCheckerEvaluator(EvaluatorPlugin):
    """规则检查评估器"""
    
    def __init__(self, rules: List[Dict]):
        self.rules = rules
    
    def name(self) -> str:
        return "rule_checker"
    
    async def evaluate(self, output: str, **kwargs) -> float:
        """
        检查是否违反规则
        返回 1.0 表示全部通过,0.0 表示有违规
        """
        violations = []
        
        for rule in self.rules:
            rule_type = rule.get('type')
            
            if rule_type == 'contains':
                # 禁止包含某些内容
                forbidden = rule.get('forbidden', [])
                for item in forbidden:
                    if item.lower() in output.lower():
                        violations.append(f"contains: {item}")
            
            elif rule_type == 'regex':
                # 正则表达式检查
                pattern = rule.get('pattern')
                import re
                if re.search(pattern, output):
                    if rule.get('should_match', True):
                        pass  # 匹配成功
                    else:
                        violations.append(f"regex_match: {pattern}")
            
            elif rule_type == 'length':
                # 长度检查
                min_len = rule.get('min', 0)
                max_len = rule.get('max', float('inf'))
                if len(output) < min_len or len(output) > max_len:
                    violations.append(f"length: {len(output)}")
            
            elif rule_type == 'format':
                # 格式检查
                expected_format = rule.get('format')
                if expected_format == 'json':
                    import json
                    try:
                        json.loads(output)
                    except:
                        violations.append("invalid_json")
        
        return 0.0 if violations else 1.0

Result Store 实现

# src/core/result_store.py
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List

class ResultStore:
    """
    结果存储
    支持 JSON 文件和数据库存储
    """
    
    def __init__(self, storage_path: str = "results/", use_db: bool = False):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        self.use_db = use_db
        
        if use_db:
            # 初始化数据库连接
            self._init_db()
    
    def save(self, eval_id: str, results: List, summary: Dict):
        """保存评估结果"""
        data = {
            "eval_id": eval_id,
            "timestamp": datetime.now().isoformat(),
            "summary": summary,
            "results": [
                {
                    "case_id": r.case_id,
                    "output": r.output,
                    "scores": r.scores,
                    "overall_score": r.overall_score,
                    "passed": r.passed,
                    "latency_ms": r.latency_ms,
                    "retry_count": r.retry_count,
                    "error": r.error,
                }
                for r in results
            ],
        }
        
        # 保存到文件
        file_path = self.storage_path / f"{eval_id}.json"
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        if self.use_db:
            self._save_to_db(eval_id, data)
    
    def load(self, eval_id: str) -> Dict:
        """加载评估结果"""
        file_path = self.storage_path / f"{eval_id}.json"
        
        if file_path.exists():
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        
        return None
    
    def list_evals(self, limit: int = 20) -> List[Dict]:
        """列出历史评估"""
        files = sorted(
            self.storage_path.glob("eval_*.json"),
            key=lambda f: f.stat().st_mtime,
            reverse=True
        )[:limit]
        
        evals = []
        for f in files:
            with open(f, 'r', encoding='utf-8') as fp:
                data = json.load(fp)
                evals.append({
                    "eval_id": data["eval_id"],
                    "timestamp": data["timestamp"],
                    "pass_rate": data["summary"]["pass_rate"],
                    "avg_score": data["summary"]["avg_score"],
                })
        
        return evals
    
    def compare(self, eval_id_a: str, eval_id_b: str) -> Dict:
        """对比两次评估结果"""
        data_a = self.load(eval_id_a)
        data_b = self.load(eval_id_b)
        
        if not data_a or not data_b:
            return {"error": "评估结果不存在"}
        
        return {
            "eval_a": {
                "id": eval_id_a,
                "pass_rate": data_a["summary"]["pass_rate"],
                "avg_score": data_a["summary"]["avg_score"],
            },
            "eval_b": {
                "id": eval_id_b,
                "pass_rate": data_b["summary"]["pass_rate"],
                "avg_score": data_b["summary"]["avg_score"],
            },
            "delta": {
                "pass_rate": data_b["summary"]["pass_rate"] - data_a["summary"]["pass_rate"],
                "avg_score": data_b["summary"]["avg_score"] - data_a["summary"]["avg_score"],
            },
        }

CLI 命令行工具

# cli.py
import argparse
import asyncio
import yaml
from pathlib import Path

def main():
    parser = argparse.ArgumentParser(description='AI Harness Evaluation Tool')
    
    subparsers = parser.add_subparsers(dest='command')
    
    # run 命令
    run_parser = subparsers.add_parser('run', help='运行评估')
    run_parser.add_argument('--dataset', required=True, help='数据集路径')
    run_parser.add_argument('--config', default='config/eval_config.yaml', help='评估配置')
    run_parser.add_argument('--output', default='results/', help='结果输出路径')
    
    # compare 命令
    compare_parser = subparsers.add_parser('compare', help='对比评估结果')
    compare_parser.add_argument('eval_id_a', help='评估ID A')
    compare_parser.add_argument('eval_id_b', help='评估ID B')
    
    # list 命令
    list_parser = subparsers.add_parser('list', help='列出历史评估')
    list_parser.add_argument('--limit', type=int, default=20, help='数量限制')
    
    # report 命令
    report_parser = subparsers.add_parser('report', help='生成报告')
    report_parser.add_argument('eval_id', help='评估ID')
    report_parser.add_argument('--format', choices=['html', 'json', 'markdown'], default='markdown')
    
    args = parser.parse_args()
    
    if args.command == 'run':
        asyncio.run(run_evaluation(args))
    elif args.command == 'compare':
        compare_evals(args)
    elif args.command == 'list':
        list_evals(args)
    elif args.command == 'report':
        generate_report(args)
    else:
        parser.print_help()

async def run_evaluation(args):
    """运行评估"""
    # 加载配置
    with open(args.config, 'r') as f:
        config = yaml.safe_load(f)
    
    # 初始化组件
    from src.core.runner import TestRunner, RunnerConfig
    from src.core.evaluator import EvaluatorEngine
    from src.core.caller import OpenAICaller
    from src.core.result_store import ResultStore
    
    runner_config = RunnerConfig(
        batch_size=config.get('execution', {}).get('batch_size', 10),
        max_concurrent=config.get('execution', {}).get('max_concurrent', 5),
        timeout_seconds=config.get('execution', {}).get('timeout_sec', 30),
        retry_count=config.get('execution', {}).get('retry', 3),
    )
    
    model_caller = OpenAICaller(config['model'])
    evaluator = EvaluatorEngine.from_config(config['evaluators'])
    store = ResultStore(args.output)
    
    runner = TestRunner(runner_config, model_caller, evaluator, store)
    
    print(f"开始评估: {args.dataset}")
    result = await runner.run(args.dataset, config)
    
    print(f"\n评估完成!")
    print(f"评估ID: {result['eval_id']}")
    print(f"通过率: {result['summary']['pass_rate']:.2%}")
    print(f"平均得分: {result['summary']['avg_score']:.3f}")

if __name__ == '__main__':
    main()

小结

Harness 架构设计要点:

设计层面核心要点
原则FIRST适配、可扩展、可追溯
分层接口层、执行层、分析层、数据层
组件Runner、Evaluator、Caller、Store
流程配置驱动、流水线编排
监控实时指标、分级告警
实战完整代码实现、CLI工具

Error rendering admonishment

Failed with:

TOML parsing error: TOML parse error at line 1, column 32
  |
1 | config = { title="架构 Checklist"" }
  |                                ^
invalid inline table
expected `}`

Original markdown input:

```admonish tip title="架构 Checklist""
✅ 是否遵循FIRST适配原则?
✅ 评估器是否可插拔?
✅ 模型后端是否统一接口?
✅ 结果是否完整可追溯?
✅ 是否支持并行执行?
✅ 是否集成监控告警?
✅ 是否有CLI/API接口?
```

下一章,我们将深入核心组件的具体实现。