第五章:Harness 架构设计
将方法论落地为可运行的系统架构。
架构设计原则
FIRST 原则(适配版)
传统测试的 FIRST 原则需要适配 AI 语境:
| 原则 | 传统含义 | AI适配含义 |
|---|---|---|
| Fast | 快速执行 | 评估效率优化(批量、缓存) |
| Independent | 测试独立 | 评估案例独立,无依赖 |
| Repeatable | 结果可复现 | 控制温度,记录完整上下文 |
| Self-validating | 自动判断 | 自动评分,阈值判断 |
| Timely | 及时编写 | Prompt迭代同步测试更新 |
可扩展性设计
graph TB
A[可扩展性需求] --> B[评估器可插拔]
A --> C[数据源多样化]
A --> D[模型后端多样]
A --> E[报告定制化]
B --> B1[插件化架构]
C --> C1[适配器模式]
D --> D1[统一接口]
E --> E1[模板化报告]
插件化设计示例:
# 评估器插件接口
class EvaluatorPlugin(ABC):
"""评估器插件基类"""
@abstractmethod
def name(self) -> str:
"""插件名称"""
pass
@abstractmethod
def evaluate(self, output: str, reference: str = None) -> float:
"""评估方法,返回0-1分数"""
pass
@abstractmethod
def config_schema(self) -> dict:
"""配置Schema"""
pass
# 具体插件实现
class SemanticSimilarityEvaluator(EvaluatorPlugin):
def name(self) -> str:
return "semantic_similarity"
def evaluate(self, output: str, reference: str = None) -> float:
if not reference:
return 0.0
# 使用embedding计算相似度
return cosine_similarity(
get_embedding(output),
get_embedding(reference)
)
def config_schema(self) -> dict:
return {
"model": {"type": "string", "default": "text-embedding-3-small"},
"threshold": {"type": "float", "default": 0.8}
}
整体架构蓝图
核心架构图
graph TB
subgraph "数据层"
A1[Golden Set Store]
A2[Result Store]
A3[Version Registry]
end
subgraph "执行层"
B1[Test Runner]
B2[Model Caller]
B3[Evaluator Engine]
end
subgraph "分析层"
C1[Score Aggregator]
C2[Comparator]
C3[Reporter]
end
subgraph "接口层"
D1[CLI]
D2[API]
D3[Web UI]
end
D1 --> B1
D2 --> B1
D3 --> C3
A1 --> B1
B1 --> B2 --> B3 --> C1
C1 --> C2 --> C3
C1 --> A2
A3 --> A1
A3 --> A2
分层职责
| 层级 | 职责 | 关键组件 |
|---|---|---|
| 接口层 | 用户交互 | CLI、API、WebUI |
| 执行层 | 评估执行 | Runner、Caller、Evaluator |
| 分析层 | 结果处理 | Aggregator、Comparator、Reporter |
| 数据层 | 数据存储 | Golden Set、Result Store、Version |
核心组件设计
Test Runner(执行引擎)
graph TB
subgraph TestRunner["Test Runner"]
A[load_cases]
B[execute]
C[parallel]
D[retry]
end
subgraph ExecutionPipeline["Execution Pipeline"]
E[Case -> Model]
F[-> Evaluator]
G[-> Result]
end
A --> E
B --> E
核心功能:
| 功能 | 说明 | 设计要点 |
|---|---|---|
| 数据加载 | 加载测试案例 | 支持多种格式(JSON/YAML/CSV) |
| 并行执行 | 批量并行处理 | 控制并发,避免限流 |
| 失败重试 | 网络失败重试 | 记录重试次数 |
| 进度跟踪 | 执行进度报告 | 实时状态更新 |
# Test Runner 核心逻辑
class TestRunner:
def __init__(self, config: RunnerConfig):
self.config = config
self.model_caller = ModelCaller(config.model)
self.evaluator = EvaluatorEngine(config.evaluators)
async def execute(self, test_cases: List[TestCase]) -> List[Result]:
results = []
# 并行执行(带进度跟踪)
with ProgressTracker(len(test_cases)) as progress:
for batch in chunk(test_cases, self.config.batch_size):
batch_results = await asyncio.gather(
*[self._execute_single(case) for case in batch]
)
results.extend(batch_results)
progress.update(len(batch))
return results
async def _execute_single(self, case: TestCase) -> Result:
# 单个案例执行流程
try:
response = await self.model_caller.call(case.input)
scores = await self.evaluator.evaluate(response, case.reference)
return Result(case_id=case.id, response=response, scores=scores)
except Exception as e:
return Result(case_id=case.id, error=str(e))
Evaluator Engine(评估引擎)
graph TB
A[评估请求] --> B[Evaluator Engine]
B --> C1[Semantic Evaluator]
B --> C2[Exact Match Evaluator]
B --> C3[G-Eval Evaluator]
B --> C4[Rule Evaluator]
C1 --> D[Score Aggregator]
C2 --> D
C3 --> D
C4 --> D
D --> E[综合得分]
评估器类型:
| 类型 | 原理 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| Exact Match | 精确字符串匹配 | 分类、抽取 | 简单可靠 | 不适合生成类 |
| Semantic Sim | Embedding相似度 | 生成类 | 语义理解 | 需embedding模型 |
| G-Eval | GPT-4评分 | 开放生成 | 灵活全面 | 成本高 |
| Rule-based | 规则检测 | 安全、格式 | 确定可靠 | 需预设规则 |
| Task-based | 任务完成验证 | Agent | 端到端验证 | 执行复杂 |
# G-Eval 实现示例
class GEvalEvaluator(EvaluatorPlugin):
"""
使用GPT-4进行评估
参考: https://arxiv.org/abs/2303.16634
"""
PROMPT_TEMPLATE = """
请评估以下AI输出的质量。
输入: {input}
输出: {output}
评估维度:
1. Relevance (相关性): 是否回答了问题 (1-10)
2. Accuracy (准确性): 内容是否正确 (1-10)
3. Coherence (连贯性): 逻辑是否通顺 (1-10)
4. Fluency (流畅性): 语言是否自然 (1-10)
请以JSON格式返回评分:
{"relevance": X, "accuracy": X, "coherence": X, "fluency": X}
"""
def evaluate(self, output: str, input: str = None, reference: str = None) -> dict:
prompt = self.PROMPT_TEMPLATE.format(input=input, output=output)
response = call_gpt4(prompt)
scores = parse_json(response)
# 计算加权平均
weights = {"relevance": 0.3, "accuracy": 0.3, "coherence": 0.2, "fluency": 0.2}
overall = sum(scores[k] * weights[k] for k in scores) / 10
return {"overall": overall, "details": scores}
Model Caller(模型调用器)
统一模型调用接口,支持多后端:
graph TB
A[Model Caller] --> B[OpenAI API]
A --> C[Local Model]
A --> D[call]
A --> E[batch_call]
A --> F[cache]
# 模型调用统一接口
class ModelCaller(ABC):
@abstractmethod
async def call(self, prompt: str, **kwargs) -> str:
pass
@abstractmethod
async def batch_call(self, prompts: List[str], **kwargs) -> List[str]:
pass
# OpenAI实现
class OpenAICaller(ModelCaller):
def __init__(self, model: str, api_key: str):
self.client = OpenAI(api_key=api_key)
self.model = model
self.cache = ResponseCache() # 缓存相同请求
async def call(self, prompt: str, **kwargs) -> str:
# 检查缓存
cache_key = hash(prompt + str(kwargs))
if cached := self.cache.get(cache_key):
return cached
# 调用API
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
result = response.choices[0].message.content
self.cache.set(cache_key, result)
return result
Result Store(结果存储)
graph TB
A[评估结果] --> B[Result Store]
B --> C1[JSON文件]
B --> C2[数据库]
B --> C3[时序存储]
C1 --> D1[简单查询]
C2 --> D2[复杂分析]
C3 --> D3[趋势分析]
存储Schema:
{
"eval_id": "eval_2024_01_15_001",
"timestamp": "2024-01-15T10:30:00Z",
"config": {
"dataset": "golden_set_v1.2",
"model": "gpt-4-turbo",
"evaluators": ["semantic_sim", "g_eval"]
},
"results": [
{
"case_id": "case_001",
"input": "用户问题...",
"output": "模型输出...",
"scores": {
"semantic_sim": 0.85,
"g_eval": {"overall": 0.82, "details": {...}}
},
"latency_ms": 1200
}
],
"summary": {
"total": 100,
"pass": 85,
"fail": 15,
"avg_score": 0.83,
"std_score": 0.12
}
}
流程编排设计
评估流水线
graph LR
A[配置加载] --> B[数据加载]
B --> C[执行评估]
C --> D[结果存储]
D --> E[分析报告]
E --> F[可视化]
subgraph "配置阶段"
A --> A1[评估器配置]
A --> A2[模型配置]
A --> A3[数据集版本]
end
subgraph "执行阶段"
C --> C1[并行调用]
C --> C2[评分计算]
C --> C3[失败处理]
end
配置驱动设计
# eval_config.yaml
evaluation:
name: "customer_service_eval"
version: "1.0"
dataset:
name: "golden_set_v1.2"
path: "datasets/golden_set_v1.2.json"
checksum: "abc123..."
model:
backend: "openai"
name: "gpt-4-turbo"
temperature: 0.3
max_tokens: 500
evaluators:
- name: "semantic_similarity"
weight: 0.4
threshold: 0.8
config:
embedding_model: "text-embedding-3"
- name: "g_eval"
weight: 0.4
threshold: 0.7
config:
criteria: ["relevance", "accuracy", "coherence"]
- name: "safety_check"
weight: 0.2
threshold: 1.0
config:
rules: ["no_harmful", "no_pii"]
execution:
parallel: true
batch_size: 10
retry_count: 3
timeout_sec: 30
output:
path: "results/"
format: "json"
include_raw: true
监控与告警集成
实时监控架构
graph LR
A[评估执行] --> B[指标收集]
B --> C[监控平台]
A --> A1[得分记录]
B --> B1[统计计算]
C --> C1[告警触发]
监控指标:
| 指标类别 | 具体指标 | 告警阈值 |
|---|---|---|
| 质量 | 平均得分、通过率 | 低于基线10% |
| 稳定性 | 得分方差、失败率 | 方差>0.15 |
| 性能 | 响应时间、Token消耗 | 超时率>5% |
| 成本 | API调用费用 | 超预算 |
实战:完整 Harness 系统实现
项目结构
ai_harness/
├── config/
│ ├── eval_config.yaml # 评估配置
│ ├── model_config.yaml # 模型配置
│ └── monitoring_config.yaml # 监控配置
├── datasets/
│ ├── golden_set_v1.0.yaml # Golden Set
│ ├── boundary_set_v1.0.yaml # Boundary Set
│ └── adversarial_set_v1.0.yaml # Adversarial Set
├── src/
│ ├── core/
│ │ ├── runner.py # Test Runner
│ │ ├── evaluator.py # Evaluator Engine
│ │ ├── caller.py # Model Caller
│ │ └── result_store.py # Result Store
│ ├── evaluators/
│ │ ├── semantic_sim.py # 语义相似度
│ │ ├── g_eval.py # G-Eval
│ │ ├── rule_checker.py # 规则检查
│ │ └── task_checker.py # 任务完成度
│ ├── monitoring/
│ │ ├── collector.py # 指标采集
│ │ ├── aggregator.py # 指标聚合
│ │ ├── alert_manager.py # 告警管理
│ │ └── dashboard.py # 仪表板
│ └── utils/
│ ├── cache.py # 缓存
│ ├── logger.py # 日志
│ └── config_loader.py # 配置加载
├── tests/
│ ├── test_runner.py
│ ├── test_evaluators.py
│ └── test_integration.py
├── reports/
│ └── templates/
│ ├── summary.html
│ └── detail.html
├── cli.py # 命令行入口
└── api.py # REST API入口
Test Runner 完整实现
# src/core/runner.py
import asyncio
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
import yaml
import json
from pathlib import Path
@dataclass
class RunnerConfig:
"""Runner 配置"""
batch_size: int = 10
max_concurrent: int = 5
timeout_seconds: int = 30
retry_count: int = 3
cache_enabled: bool = True
save_intermediate: bool = True
@dataclass
class TestCase:
"""测试案例"""
id: str
input: str
reference: Optional[str] = None
category: Optional[str] = None
metadata: Dict = field(default_factory=dict)
@dataclass
class ExecutionResult:
"""执行结果"""
case_id: str
output: str
scores: Dict[str, float]
overall_score: float
passed: bool
latency_ms: float
retry_count: int = 0
error: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
class TestRunner:
"""
测试执行引擎
核心职责:
1. 加载测试案例
2. 并行执行模型调用
3. 调用评估器评分
4. 聚合和存储结果
"""
def __init__(
self,
config: RunnerConfig,
model_caller,
evaluator_engine,
result_store
):
self.config = config
self.model_caller = model_caller
self.evaluator = evaluator_engine
self.store = result_store
self.cache = {} if config.cache_enabled else None
async def run(
self,
dataset_path: str,
eval_config: Dict
) -> Dict:
"""
执行完整评估流程
"""
# 1. 加载测试数据
test_cases = self._load_dataset(dataset_path)
# 2. 执行评估
results = await self._execute_batch(test_cases, eval_config)
# 3. 聚合结果
summary = self._aggregate_results(results)
# 4. 存储结果
eval_id = self._generate_eval_id()
self.store.save(eval_id, results, summary)
# 5. 生成报告
report = self._generate_report(eval_id, results, summary)
return {
"eval_id": eval_id,
"results": results,
"summary": summary,
"report": report,
}
def _load_dataset(self, path: str) -> List[TestCase]:
"""加载测试数据集"""
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
cases = []
for case_data in data.get('cases', []):
cases.append(TestCase(
id=case_data['id'],
input=case_data['input'],
reference=case_data.get('reference'),
category=case_data.get('category'),
metadata=case_data.get('metadata', {}),
))
return cases
async def _execute_batch(
self,
cases: List[TestCase],
eval_config: Dict
) -> List[ExecutionResult]:
"""批量执行评估"""
results = []
# 分批并行执行
for batch in self._chunk(cases, self.config.batch_size):
batch_results = await asyncio.gather(
*[self._execute_single(case, eval_config) for case in batch],
return_exceptions=True
)
for result in batch_results:
if isinstance(result, Exception):
# 异常处理
results.append(ExecutionResult(
case_id="unknown",
output="",
scores={},
overall_score=0.0,
passed=False,
latency_ms=0,
error=str(result),
))
else:
results.append(result)
return results
async def _execute_single(
self,
case: TestCase,
eval_config: Dict
) -> ExecutionResult:
"""执行单个案例"""
start_time = datetime.now()
retry_count = 0
error = None
# 检查缓存
cache_key = f"{case.id}:{case.input}"
if self.cache and cache_key in self.cache:
cached = self.cache[cache_key]
return ExecutionResult(
case_id=case.id,
output=cached['output'],
scores=cached['scores'],
overall_score=cached['overall_score'],
passed=cached['passed'],
latency_ms=0,
retry_count=0,
)
# 模型调用(带重试)
output = None
for attempt in range(self.config.retry_count):
try:
output = await asyncio.wait_for(
self.model_caller.call(case.input),
timeout=self.config.timeout_seconds
)
break
except asyncio.TimeoutError:
retry_count += 1
error = "timeout"
except Exception as e:
retry_count += 1
error = str(e)
if output is None:
return ExecutionResult(
case_id=case.id,
output="",
scores={},
overall_score=0.0,
passed=False,
latency_ms=(datetime.now() - start_time).total_seconds() * 1000,
retry_count=retry_count,
error=error,
)
# 评估评分
scores = await self.evaluator.evaluate(
output=output,
reference=case.reference,
input=case.input,
config=eval_config,
)
# 计算综合得分
overall = self._compute_overall(scores, eval_config.get('weights', {}))
threshold = eval_config.get('threshold', 0.75)
passed = overall >= threshold
# 缓存结果
if self.cache:
self.cache[cache_key] = {
'output': output,
'scores': scores,
'overall_score': overall,
'passed': passed,
}
return ExecutionResult(
case_id=case.id,
output=output,
scores=scores,
overall_score=overall,
passed=passed,
latency_ms=(datetime.now() - start_time).total_seconds() * 1000,
retry_count=retry_count,
)
def _chunk(self, items: List, size: int) -> List[List]:
"""分块"""
return [items[i:i+size] for i in range(0, len(items), size)]
def _compute_overall(self, scores: Dict, weights: Dict) -> float:
"""计算综合得分"""
if not weights:
return sum(scores.values()) / len(scores) if scores else 0.0
total = 0.0
total_weight = 0.0
for key, weight in weights.items():
if key in scores:
total += scores[key] * weight
total_weight += weight
return total / total_weight if total_weight > 0 else 0.0
def _aggregate_results(self, results: List[ExecutionResult]) -> Dict:
"""聚合结果统计"""
total = len(results)
passed = sum(1 for r in results if r.passed)
scores = [r.overall_score for r in results if r.overall_score > 0]
latencies = [r.latency_ms for r in results]
return {
"total": total,
"passed": passed,
"pass_rate": passed / total if total > 0 else 0,
"avg_score": sum(scores) / len(scores) if scores else 0,
"min_score": min(scores) if scores else 0,
"max_score": max(scores) if scores else 0,
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
"p95_latency_ms": self._percentile(latencies, 95),
"error_count": sum(1 for r in results if r.error),
}
def _percentile(self, values: List, p: int) -> float:
"""计算百分位数"""
if not values:
return 0
sorted_vals = sorted(values)
idx = int(len(sorted_vals) * p / 100)
return sorted_vals[min(idx, len(sorted_vals) - 1)]
def _generate_eval_id(self) -> str:
"""生成评估ID"""
return f"eval_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def _generate_report(self, eval_id: str, results: List, summary: Dict) -> Dict:
"""生成报告"""
return {
"eval_id": eval_id,
"summary": summary,
"failed_cases": [
{"case_id": r.case_id, "score": r.overall_score, "error": r.error}
for r in results if not r.passed
],
"top_cases": sorted(
[{"case_id": r.case_id, "score": r.overall_score} for r in results],
key=lambda x: x["score"],
reverse=True
)[:5],
"timestamp": datetime.now(),
}
Evaluator Engine 实现
# src/core/evaluator.py
from typing import Dict, List, Optional
from abc import ABC, abstractmethod
import asyncio
class EvaluatorPlugin(ABC):
"""评估器插件基类"""
@abstractmethod
def name(self) -> str:
"""插件名称"""
pass
@abstractmethod
async def evaluate(
self,
output: str,
reference: Optional[str] = None,
input: Optional[str] = None,
**kwargs
) -> float:
"""评估方法,返回0-1分数"""
pass
@classmethod
def from_config(cls, config: Dict) -> 'EvaluatorPlugin':
"""从配置创建实例"""
return cls(**config)
class EvaluatorEngine:
"""
评估引擎
支持多种评估器组合
"""
def __init__(self, evaluators: List[EvaluatorPlugin]):
self.evaluators = {e.name(): e for e in evaluators}
async def evaluate(
self,
output: str,
reference: Optional[str] = None,
input: Optional[str] = None,
config: Dict = None
) -> Dict[str, float]:
"""
执行多评估器评估
"""
config = config or {}
evaluator_names = config.get('evaluators', list(self.evaluators.keys()))
results = {}
for name in evaluator_names:
if name not in self.evaluators:
continue
evaluator = self.evaluators[name]
evaluator_config = config.get('evaluator_config', {}).get(name, {})
try:
score = await evaluator.evaluate(
output=output,
reference=reference,
input=input,
**evaluator_config
)
results[name] = score
except Exception as e:
results[name] = 0.0
results[f"{name}_error"] = str(e)
return results
# 具体评估器实现
class SemanticSimilarityEvaluator(EvaluatorPlugin):
"""语义相似度评估器"""
def __init__(self, model: str = "text-embedding-3-small", threshold: float = 0.8):
self.model = model
self.threshold = threshold
# 初始化embedding客户端
from openai import OpenAI
self.client = OpenAI()
def name(self) -> str:
return "semantic_similarity"
async def evaluate(
self,
output: str,
reference: Optional[str] = None,
**kwargs
) -> float:
if not reference:
return 0.0
# 获取embeddings
output_emb = self.client.embeddings.create(
input=output, model=self.model
).data[0].embedding
ref_emb = self.client.embeddings.create(
input=reference, model=self.model
).data[0].embedding
# 计算cosine相似度
similarity = self._cosine_similarity(output_emb, ref_emb)
return similarity
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""计算余弦相似度"""
import math
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x*x for x in a))
norm_b = math.sqrt(sum(y*y for y in b))
return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0
class RuleCheckerEvaluator(EvaluatorPlugin):
"""规则检查评估器"""
def __init__(self, rules: List[Dict]):
self.rules = rules
def name(self) -> str:
return "rule_checker"
async def evaluate(self, output: str, **kwargs) -> float:
"""
检查是否违反规则
返回 1.0 表示全部通过,0.0 表示有违规
"""
violations = []
for rule in self.rules:
rule_type = rule.get('type')
if rule_type == 'contains':
# 禁止包含某些内容
forbidden = rule.get('forbidden', [])
for item in forbidden:
if item.lower() in output.lower():
violations.append(f"contains: {item}")
elif rule_type == 'regex':
# 正则表达式检查
pattern = rule.get('pattern')
import re
if re.search(pattern, output):
if rule.get('should_match', True):
pass # 匹配成功
else:
violations.append(f"regex_match: {pattern}")
elif rule_type == 'length':
# 长度检查
min_len = rule.get('min', 0)
max_len = rule.get('max', float('inf'))
if len(output) < min_len or len(output) > max_len:
violations.append(f"length: {len(output)}")
elif rule_type == 'format':
# 格式检查
expected_format = rule.get('format')
if expected_format == 'json':
import json
try:
json.loads(output)
except:
violations.append("invalid_json")
return 0.0 if violations else 1.0
Result Store 实现
# src/core/result_store.py
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List
class ResultStore:
"""
结果存储
支持 JSON 文件和数据库存储
"""
def __init__(self, storage_path: str = "results/", use_db: bool = False):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.use_db = use_db
if use_db:
# 初始化数据库连接
self._init_db()
def save(self, eval_id: str, results: List, summary: Dict):
"""保存评估结果"""
data = {
"eval_id": eval_id,
"timestamp": datetime.now().isoformat(),
"summary": summary,
"results": [
{
"case_id": r.case_id,
"output": r.output,
"scores": r.scores,
"overall_score": r.overall_score,
"passed": r.passed,
"latency_ms": r.latency_ms,
"retry_count": r.retry_count,
"error": r.error,
}
for r in results
],
}
# 保存到文件
file_path = self.storage_path / f"{eval_id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
if self.use_db:
self._save_to_db(eval_id, data)
def load(self, eval_id: str) -> Dict:
"""加载评估结果"""
file_path = self.storage_path / f"{eval_id}.json"
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def list_evals(self, limit: int = 20) -> List[Dict]:
"""列出历史评估"""
files = sorted(
self.storage_path.glob("eval_*.json"),
key=lambda f: f.stat().st_mtime,
reverse=True
)[:limit]
evals = []
for f in files:
with open(f, 'r', encoding='utf-8') as fp:
data = json.load(fp)
evals.append({
"eval_id": data["eval_id"],
"timestamp": data["timestamp"],
"pass_rate": data["summary"]["pass_rate"],
"avg_score": data["summary"]["avg_score"],
})
return evals
def compare(self, eval_id_a: str, eval_id_b: str) -> Dict:
"""对比两次评估结果"""
data_a = self.load(eval_id_a)
data_b = self.load(eval_id_b)
if not data_a or not data_b:
return {"error": "评估结果不存在"}
return {
"eval_a": {
"id": eval_id_a,
"pass_rate": data_a["summary"]["pass_rate"],
"avg_score": data_a["summary"]["avg_score"],
},
"eval_b": {
"id": eval_id_b,
"pass_rate": data_b["summary"]["pass_rate"],
"avg_score": data_b["summary"]["avg_score"],
},
"delta": {
"pass_rate": data_b["summary"]["pass_rate"] - data_a["summary"]["pass_rate"],
"avg_score": data_b["summary"]["avg_score"] - data_a["summary"]["avg_score"],
},
}
CLI 命令行工具
# cli.py
import argparse
import asyncio
import yaml
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description='AI Harness Evaluation Tool')
subparsers = parser.add_subparsers(dest='command')
# run 命令
run_parser = subparsers.add_parser('run', help='运行评估')
run_parser.add_argument('--dataset', required=True, help='数据集路径')
run_parser.add_argument('--config', default='config/eval_config.yaml', help='评估配置')
run_parser.add_argument('--output', default='results/', help='结果输出路径')
# compare 命令
compare_parser = subparsers.add_parser('compare', help='对比评估结果')
compare_parser.add_argument('eval_id_a', help='评估ID A')
compare_parser.add_argument('eval_id_b', help='评估ID B')
# list 命令
list_parser = subparsers.add_parser('list', help='列出历史评估')
list_parser.add_argument('--limit', type=int, default=20, help='数量限制')
# report 命令
report_parser = subparsers.add_parser('report', help='生成报告')
report_parser.add_argument('eval_id', help='评估ID')
report_parser.add_argument('--format', choices=['html', 'json', 'markdown'], default='markdown')
args = parser.parse_args()
if args.command == 'run':
asyncio.run(run_evaluation(args))
elif args.command == 'compare':
compare_evals(args)
elif args.command == 'list':
list_evals(args)
elif args.command == 'report':
generate_report(args)
else:
parser.print_help()
async def run_evaluation(args):
"""运行评估"""
# 加载配置
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
# 初始化组件
from src.core.runner import TestRunner, RunnerConfig
from src.core.evaluator import EvaluatorEngine
from src.core.caller import OpenAICaller
from src.core.result_store import ResultStore
runner_config = RunnerConfig(
batch_size=config.get('execution', {}).get('batch_size', 10),
max_concurrent=config.get('execution', {}).get('max_concurrent', 5),
timeout_seconds=config.get('execution', {}).get('timeout_sec', 30),
retry_count=config.get('execution', {}).get('retry', 3),
)
model_caller = OpenAICaller(config['model'])
evaluator = EvaluatorEngine.from_config(config['evaluators'])
store = ResultStore(args.output)
runner = TestRunner(runner_config, model_caller, evaluator, store)
print(f"开始评估: {args.dataset}")
result = await runner.run(args.dataset, config)
print(f"\n评估完成!")
print(f"评估ID: {result['eval_id']}")
print(f"通过率: {result['summary']['pass_rate']:.2%}")
print(f"平均得分: {result['summary']['avg_score']:.3f}")
if __name__ == '__main__':
main()
小结
Harness 架构设计要点:
| 设计层面 | 核心要点 |
|---|---|
| 原则 | FIRST适配、可扩展、可追溯 |
| 分层 | 接口层、执行层、分析层、数据层 |
| 组件 | Runner、Evaluator、Caller、Store |
| 流程 | 配置驱动、流水线编排 |
| 监控 | 实时指标、分级告警 |
| 实战 | 完整代码实现、CLI工具 |
Failed with:
TOML parsing error: TOML parse error at line 1, column 32
|
1 | config = { title="架构 Checklist"" }
| ^
invalid inline table
expected `}`
Original markdown input:
```admonish tip title="架构 Checklist""
✅ 是否遵循FIRST适配原则?
✅ 评估器是否可插拔?
✅ 模型后端是否统一接口?
✅ 结果是否完整可追溯?
✅ 是否支持并行执行?
✅ 是否集成监控告警?
✅ 是否有CLI/API接口?
```
下一章,我们将深入核心组件的具体实现。