Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第九章:监控与持续优化

评估不是终点,而是持续优化的起点。本章讲解如何建立监控闭环。

从评估到监控

为什么需要在线监控

graph TB
    A[离线评估] --> B[部署上线]
    B --> C[在线监控]
    
    C --> D1[质量监控]
    C --> D2[性能监控]
    C --> D3[成本监控]
    
    D1 --> E[发现问题]
    E --> F[反馈优化]
    F --> A
    
    style C fill:#9f9,stroke:#333

离线评估的局限:

局限说明在线监控补充
样本有限Golden Set数量有限真实用户无限多样性
环境差异测试环境≠生产环境实时捕获真实场景
静态基准无法反映最新表现持续追踪质量趋势
用户视角缺失评估≠用户体验用户行为真实反馈

监控指标体系

三层监控架构

graph TB
    A[业务指标层<br/>用户体验]
    B[技术指标层<br/>系统质量]
    C[基础指标层<br/>系统健康]
    
    A --> B --> C

基础指标层

指标说明告警阈值
响应时间 P50/P95/P99请求延迟分布P95 > 5s
错误率请求失败比例> 1%
Token消耗每请求Token数超预算20%
并发数同时处理请求数> 容量80%
队列积压待处理请求队列> 100

技术指标层

指标说明告警阈值
自动评估得分在线采样评估得分低于基线10%
用户满意度用户评分/反馈低于3.5/5
任务完成率Agent任务成功率低于80%
内容安全率安全检查通过率低于99%
引用准确性RAG引用正确率低于90%

业务指标层

指标说明告警阈值
用户留存率用户继续使用比例下降趋势
任务转化率用户完成任务比例低于目标
用户投诉率投诉/总请求> 0.1%
NPS用户净推荐值< 30

监控实现架构

数据采集层

# monitoring/collector.py
class MetricsCollector:
    """
    监控指标采集器
    """
    
    def __init__(self):
        self.metrics_buffer = []
        self.flush_interval = 60  # 60秒flush一次
    
    def collect_request(self, request_data: Dict):
        """
        采集单次请求指标
        """
        metrics = {
            "timestamp": datetime.now(),
            "request_id": request_data["request_id"],
            
            # 基础指标
            "response_time_ms": request_data["latency"],
            "tokens_used": request_data["tokens"],
            "model": request_data["model"],
            "temperature": request_data["temperature"],
            
            # 输出内容(用于后续评估)
            "input": request_data["input"],
            "output": request_data["output"],
            
            # 用户反馈(如有)
            "user_rating": request_data.get("user_rating"),
            "user_feedback": request_data.get("user_feedback"),
        }
        
        self.metrics_buffer.append(metrics)
        
        # 定期flush
        if len(self.metrics_buffer) >= 100:
            self._flush()
    
    def _flush(self):
        """批量写入存储"""
        # 写入时序数据库(如Prometheus、InfluxDB)
        # 写入日志存储(用于后续抽样评估)
        self.storage.write_batch(self.metrics_buffer)
        self.metrics_buffer = []

自动评估采样

# monitoring/auto_evaluator.py
class AutoEvaluator:
    """
    在线自动评估采样器
    """
    
    def __init__(self, config: Dict):
        self.sample_rate = config.get("sample_rate", 0.05)  # 5%采样
        self.evaluator = LightweightEvaluator()  # 快速评估器
    
    def should_sample(self) -> bool:
        """决定是否采样当前请求"""
        return random.random() < self.sample_rate
    
    async def evaluate_sample(self, metrics: Dict) -> Dict:
        """
        评估采样请求
        
        使用轻量级评估(快速、低成本)
        """
        # 使用语义相似度评估(比G-Eval更快)
        scores = await self.evaluator.quick_eval(
            metrics["input"],
            metrics["output"]
        )
        
        return {
            "request_id": metrics["request_id"],
            "eval_scores": scores,
            "evaluated_at": datetime.now()
        }

指标聚合与存储

# monitoring/aggregator.py
class MetricsAggregator:
    """
    指标聚合器
    计算实时统计指标
    """
    
    def aggregate_window(self, window_seconds: int = 60) -> Dict:
        """
        聚合时间窗口内指标
        """
        metrics = self.storage.query_recent(window_seconds)
        
        return {
            "window_seconds": window_seconds,
            "request_count": len(metrics),
            
            # 响应时间统计
            "response_time": {
                "p50": percentile(metrics, "response_time_ms", 50),
                "p95": percentile(metrics, "response_time_ms", 95),
                "p99": percentile(metrics, "response_time_ms", 99),
                "mean": mean(metrics, "response_time_ms"),
            },
            
            # Token统计
            "tokens": {
                "total": sum(m["tokens_used"] for m in metrics),
                "per_request_mean": mean(metrics, "tokens_used"),
            },
            
            # 评估得分统计(如有)
            "eval_scores": self._aggregate_eval_scores(metrics),
            
            # 用户反馈统计(如有)
            "user_satisfaction": self._aggregate_user_feedback(metrics),
        }

告警系统设计

告警分级

级别触发条件处理时效示例
P0系统不可用/安全严重问题立即错误率>10%, 安全违规>5%
P1质量严重下降1小时内评估得分<基线20%
P2性能/成本问题当日处理P95>10s, 成本超预算
P3轻微问题周内关注个别案例失败

告警实现

# monitoring/alert_manager.py
class AlertManager:
    """
    告警管理器
    """
    
    ALERT_RULES = [
        {
            "name": "error_rate_high",
            "level": "P0",
            "condition": lambda m: m["error_rate"] > 0.10,
            "message": "错误率超过10%,系统可能不可用",
            "channels": ["slack", "email", "phone"],
        },
        {
            "name": "quality_drop",
            "level": "P1",
            "condition": lambda m: m["eval_score"] < m["baseline"] * 0.8,
            "message": "评估得分低于基线20%",
            "channels": ["slack", "email"],
        },
        {
            "name": "latency_high",
            "level": "P2",
            "condition": lambda m: m["response_time_p95"] > 10000,
            "message": "P95响应时间超过10秒",
            "channels": ["slack"],
        },
    ]
    
    def check_alerts(self, metrics: Dict) -> List[Dict]:
        """
        检查是否触发告警
        """
        alerts = []
        for rule in self.ALERT_RULES:
            if rule["condition"](metrics):
                alert = {
                    "rule_name": rule["name"],
                    "level": rule["level"],
                    "message": rule["message"],
                    "metrics_snapshot": metrics,
                    "triggered_at": datetime.now(),
                }
                alerts.append(alert)
                self._send_alert(alert, rule["channels"])
        
        return alerts
    
    def _send_alert(self, alert: Dict, channels: List[str]):
        """发送告警通知"""
        for channel in channels:
            if channel == "slack":
                self.slack_client.send(alert["message"])
            elif channel == "email":
                self.email_client.send(
                    subject=f"[{alert['level']}] {alert['rule_name']}",
                    body=alert["message"]
                )

告警收敛

避免告警风暴:

class AlertSuppressor:
    """
    告警收敛器
    防止告警风暴
    """
    
    def __init__(self, cooldown_seconds: int = 300):
        self.cooldown = cooldown_seconds
        self.last_alert_time = {}  # rule_name -> last_trigger_time
    
    def should_send(self, rule_name: str) -> bool:
        """
        判断是否应该发送(是否在冷却期)
        """
        last_time = self.last_alert_time.get(rule_name)
        
        if last_time is None:
            self.last_alert_time[rule_name] = datetime.now()
            return True
        
        elapsed = (datetime.now() - last_time).total_seconds()
        if elapsed > self.cooldown:
            self.last_alert_time[rule_name] = datetime.now()
            return True
        
        return False  # 在冷却期内,不重复发送

持续优化闭环

反馈收集机制

graph LR
    A[用户请求] --> B[模型响应]
    B --> C[用户反馈]
    C --> D[反馈存储]
    
    D --> E1[显式反馈]
    D --> E2[隐式反馈]
    
    E1 --> F[评分/投诉]
    E2 --> G[行为分析]
    
    F --> H[优化决策]
    G --> H
    H --> I[Prompt/模型更新]

反馈类型

类型数据来源用途
显式评分用户打分(1-5星)质量量化指标
用户投诉投诉渠道问题发现来源
任务完成任务系统Agent效果评估
行为数据用户后续行为隐式满意度
专家评审定期抽检深度质量分析

优化决策框架

# optimization/decision_engine.py
class OptimizationDecisionEngine:
    """
    优化决策引擎
    分析监控数据,生成优化建议
    """
    
    def analyze_and_recommend(self, monitoring_data: Dict) -> List[Dict]:
        """
        分析监控数据,生成优化建议
        """
        recommendations = []
        
        # 1. 分析质量趋势
        quality_trend = self._analyze_quality_trend(monitoring_data)
        if quality_trend["direction"] == "declining":
            recommendations.append({
                "type": "prompt_update",
                "priority": "high",
                "reason": "质量持续下降",
                "suggestion": "检查最近Prompt变更,考虑回滚或优化"
            })
        
        # 2. 分析失败案例
        failed_cases = monitoring_data.get("failed_cases", [])
        if failed_cases:
            pattern = self._analyze_failure_pattern(failed_cases)
            recommendations.append({
                "type": "prompt_refine",
                "priority": "medium",
                "reason": f"失败模式: {pattern['type']}",
                "suggestion": f"针对{pattern['type']}场景优化Prompt"
            })
        
        # 3. 分析成本趋势
        cost_trend = self._analyze_cost_trend(monitoring_data)
        if cost_trend["increasing"]:
            recommendations.append({
                "type": "cost_optimize",
                "priority": "medium",
                "reason": "Token消耗持续增长",
                "suggestion": "考虑Prompt精简或模型切换"
            })
        
        return recommendations
    
    def _analyze_quality_trend(self, data: Dict) -> Dict:
        """分析质量趋势"""
        scores = data.get("daily_scores", [])
        if len(scores) < 7:
            return {"direction": "stable"}
        
        recent = scores[-3:]
        previous = scores[-7:-4]
        
        recent_mean = mean(recent)
        previous_mean = mean(previous)
        
        if recent_mean < previous_mean - 0.05:
            return {"direction": "declining", "delta": recent_mean - previous_mean}
        elif recent_mean > previous_mean + 0.05:
            return {"direction": "improving", "delta": recent_mean - previous_mean}
        else:
            return {"direction": "stable"}

A/B 测试集成

# optimization/ab_test.py
class ABTestFramework:
    """
    A/B测试框架
    用于验证优化效果
    """
    
    def setup_test(
        self,
        name: str,
        variant_a: Dict,  # 当前版本
        variant_b: Dict,  # 新版本
        traffic_split: float = 0.5
    ):
        """
        设置A/B测试
        
        Args:
            name: 测试名称
            variant_a: A版本配置(基线)
            variant_b: B版本配置(新版本)
            traffic_split: B版本流量比例
        """
        self.test = {
            "name": name,
            "variant_a": variant_a,
            "variant_b": variant_b,
            "split": traffic_split,
            "metrics_a": [],
            "metrics_b": [],
            "start_time": datetime.now(),
        }
    
    def route_request(self) -> str:
        """
        路由请求到不同版本
        """
        if random.random() < self.test["split"]:
            return "B"
        return "A"
    
    def record_result(self, variant: str, metrics: Dict):
        """记录测试结果"""
        if variant == "A":
            self.test["metrics_a"].append(metrics)
        else:
            self.test["metrics_b"].append(metrics)
    
    def analyze_results(self) -> Dict:
        """
        分析A/B测试结果
        """
        a_scores = [m["score"] for m in self.test["metrics_a"]]
        b_scores = [m["score"] for m in self.test["metrics_b"]]
        
        return {
            "variant_a_mean": mean(a_scores),
            "variant_b_mean": mean(b_scores),
            "delta": mean(b_scores) - mean(a_scores),
            "significant": self._check_significance(a_scores, b_scores),
            "recommendation": "adopt_b" if mean(b_scores) > mean(a_scores) + 0.05 else "keep_a"
        }

监控仪表板设计

核心视图

┌──────────────────────────────────────────────────┐
│          AI System Monitoring Dashboard          │
├──────────────────────────────────────────────────┤
│  [Overall Score]   [Response Time]   [Token Cost]│
│     0.82 ▲          P95: 2.3s         $120/day   │
├───────────────────┬───────────────────┬──────────┤
│  Quality Trend    │   Error Heatmap   │  Alerts  │
│  (30 days)        │   (by category)   │  [P0: 0] │
│  ▁▂▃▄▅▆▇█        │                   │  [P1: 2] │
│                   │                   │  [P2: 5] │
├───────────────────┴───────────────────┴──────────┤
│  Recent Failed Cases (Top 5)                     │
│  - req_12345: safety_violation                    │
│  - req_12346: timeout                             │
│  - req_12347: eval_score_below_threshold          │
└──────────────────────────────────────────────────┘

关键图表

图表类型用途
质量趋势时间序列折线图追踪质量变化
响应时间分布直方图性能分析
错误热图热力图问题定位
Token消耗趋势时间序列柱状图成本监控
用户满意度分布饼图用户体验概览

实战:Prometheus + Grafana 监控系统

Prometheus 配置

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

rule_files:
  - 'alert_rules.yml'

scrape_configs:
  # AI服务监控
  - job_name: 'ai_service'
    static_configs:
      - targets: ['ai-service:8080']
    metrics_path: '/metrics'
    
  # 评估系统监控
  - job_name: 'evaluator'
    static_configs:
      - targets: ['evaluator:9090']
    
  # Redis队列监控
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

Alert Rules 配置

# alert_rules.yml
groups:
  - name: ai_quality_alerts
    rules:
      # P0 级别告警
      - alert: AIServiceDown
        expr: up{job="ai_service"} == 0
        for: 1m
        labels:
          severity: critical
          level: P0
        annotations:
          summary: "AI 服务不可用"
          description: "AI 服务已宕机超过1分钟"
      
      - alert: ErrorRateHigh
        expr: rate(ai_requests_errors_total[5m]) / rate(ai_requests_total[5m]) > 0.10
        for: 2m
        labels:
          severity: critical
          level: P0
        annotations:
          summary: "错误率超过10%"
          description: "最近5分钟错误率: {{ $value | humanizePercentage }}"
      
      - alert: SafetyViolationHigh
        expr: rate(ai_safety_violations_total[5m]) > 0.05
        for: 1m
        labels:
          severity: critical
          level: P0
        annotations:
          summary: "安全违规频发"
          description: "检测到安全违规频率异常"
      
      # P1 级别告警
      - alert: QualityScoreDrop
        expr: ai_eval_score_avg < ai_eval_score_baseline * 0.8
        for: 5m
        labels:
          severity: warning
          level: P1
        annotations:
          summary: "评估得分低于基线20%"
          description: "当前得分: {{ $value }},基线: {{ $labels.baseline }}"
      
      - alert: LatencyHighP95
        expr: histogram_quantile(0.95, rate(ai_request_latency_bucket[5m])) > 10000
        for: 5m
        labels:
          severity: warning
          level: P1
        annotations:
          summary: "P95延迟超过10秒"
          description: "P95延迟: {{ $value | humanizeDuration }}"
      
      # P2 级别告警
      - alert: TokenCostHigh
        expr: rate(ai_tokens_total[1h]) * 3600 * 0.0001 > 100
        for: 30m
        labels:
          severity: info
          level: P2
        annotations:
          summary: "Token成本过高"
          description: "预计每小时成本超过$100"
      
      - alert: UserSatisfactionLow
        expr: ai_user_satisfaction_avg < 3.5
        for: 1h
        labels:
          severity: info
          level: P2
        annotations:
          summary: "用户满意度偏低"
          description: "平均评分低于3.5/5"

Metrics 导出实现

# monitoring/metrics_exporter.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

class AIMetricsExporter:
    """
    AI 服务 Prometheus 指标导出器
    """
    
    def __init__(self, port: int = 9090):
        # 初始化指标
        self._init_metrics()
        # 启动 HTTP server
        start_http_server(port)
    
    def _init_metrics(self):
        """初始化 Prometheus 指标"""
        
        # 计数器
        self.requests_total = Counter(
            'ai_requests_total',
            'AI请求总数',
            ['model', 'endpoint', 'status']
        )
        
        self.requests_errors = Counter(
            'ai_requests_errors_total',
            'AI请求错误数',
            ['model', 'error_type']
        )
        
        self.tokens_total = Counter(
            'ai_tokens_total',
            'Token消耗总数',
            ['model', 'type']  # type: input/output
        )
        
        self.safety_violations = Counter(
            'ai_safety_violations_total',
            '安全违规计数',
            ['violation_type']
        )
        
        # 直方图(延迟)
        self.request_latency = Histogram(
            'ai_request_latency_seconds',
            '请求延迟分布',
            ['model', 'endpoint'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
        )
        
        # Gauge(实时值)
        self.eval_score_avg = Gauge(
            'ai_eval_score_avg',
            '平均评估得分',
            ['evaluator']
        )
        
        self.eval_score_baseline = Gauge(
            'ai_eval_score_baseline',
            '评估得分基线',
            ['evaluator']
        )
        
        self.user_satisfaction_avg = Gauge(
            'ai_user_satisfaction_avg',
            '平均用户满意度'
        )
        
        self.queue_size = Gauge(
            'ai_queue_size',
            '请求队列大小'
        )
    
    def record_request(self, model: str, endpoint: str, latency: float, 
                       tokens_input: int, tokens_output: int, status: str):
        """记录请求指标"""
        self.requests_total.labels(model=model, endpoint=endpoint, status=status).inc()
        
        if status == 'success':
            self.tokens_total.labels(model=model, type='input').inc(tokens_input)
            self.tokens_total.labels(model=model, type='output').inc(tokens_output)
        
        self.request_latency.labels(model=model, endpoint=endpoint).observe(latency)
    
    def record_error(self, model: str, error_type: str):
        """记录错误"""
        self.requests_errors.labels(model=model, error_type=error_type).inc()
    
    def record_safety_violation(self, violation_type: str):
        """记录安全违规"""
        self.safety_violations.labels(violation_type=violation_type).inc()
    
    def update_eval_score(self, evaluator: str, score: float):
        """更新评估得分"""
        self.eval_score_avg.labels(evaluator=evaluator).set(score)
    
    def update_user_satisfaction(self, score: float):
        """更新用户满意度"""
        self.user_satisfaction_avg.set(score)
    
    def update_queue_size(self, size: int):
        """更新队列大小"""
        self.queue_size.set(size)

Grafana Dashboard 配置

{
  "dashboard": {
    "title": "AI System Monitoring",
    "uid": "ai-monitoring",
    "panels": [
      {
        "title": "Overall Quality Score",
        "type": "gauge",
        "gridPos": {"x": 0, "y": 0, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "ai_eval_score_avg",
            "legendFormat": "Current"
          },
          {
            "expr": "ai_eval_score_baseline",
            "legendFormat": "Baseline"
          }
        ],
        "options": {
          "thresholds": [
            {"value": 0.6, "color": "red"},
            {"value": 0.75, "color": "yellow"},
            {"value": 0.85, "color": "green"}
          ]
        }
      },
      {
        "title": "Request Rate",
        "type": "graph",
        "gridPos": {"x": 4, "y": 0, "w": 8, "h": 4},
        "targets": [
          {
            "expr": "rate(ai_requests_total[5m])",
            "legendFormat": "{{model}} - {{endpoint}}"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "graph",
        "gridPos": {"x": 12, "y": 0, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "rate(ai_requests_errors_total[5m]) / rate(ai_requests_total[5m])",
            "legendFormat": "Error Rate"
          }
        ],
        "alert": {
          "conditions": [
            {
              "evaluator": {"params": [0.05], "type": "gt"},
              "operator": {"type": "and"},
              "query": {"params": ["A", "5m", "now"]}
            }
          ]
        }
      },
      {
        "title": "Latency Distribution",
        "type": "heatmap",
        "gridPos": {"x": 0, "y": 4, "w": 8, "h": 4},
        "targets": [
          {
            "expr": "sum(increase(ai_request_latency_seconds_bucket[5m])) by (le)",
            "format": "heatmap",
            "legendFormat": "{{le}}"
          }
        ],
        "dataFormat": "tsbuckets"
      },
      {
        "title": "Token Consumption",
        "type": "graph",
        "gridPos": {"x": 8, "y": 4, "w": 8, "h": 4},
        "targets": [
          {
            "expr": "rate(ai_tokens_total{type=\"input\"}[1h])",
            "legendFormat": "Input Tokens"
          },
          {
            "expr": "rate(ai_tokens_total{type=\"output\"}[1h])",
            "legendFormat": "Output Tokens"
          }
        ]
      },
      {
        "title": "User Satisfaction",
        "type": "stat",
        "gridPos": {"x": 0, "y": 8, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "ai_user_satisfaction_avg",
            "legendFormat": "Avg Rating"
          }
        ],
        "options": {
          "graphMode": "area",
          "colorMode": "value"
        }
      },
      {
        "title": "Safety Violations",
        "type": "graph",
        "gridPos": {"x": 4, "y": 8, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "rate(ai_safety_violations_total[1h])",
            "legendFormat": "{{violation_type}}"
          }
        ]
      },
      {
        "title": "Queue Status",
        "type": "stat",
        "gridPos": {"x": 8, "y": 8, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "ai_queue_size",
            "legendFormat": "Queue Size"
          }
        ]
      },
      {
        "title": "Alert Status",
        "type": "alertlist",
        "gridPos": {"x": 12, "y": 8, "w": 4, "h": 4},
        "options": {
          "show": "current"
        }
      }
    ]
  }
}

Grafana Dashboard 可视化

┌──────────────────────────────────────────────────────────────────────────┐
│                    AI System Monitoring Dashboard                          │
├───────────────────┬──────────────────────────┬───────────────┬───────────┤
│ Overall Score     │     Request Rate         │   Error Rate  │  Alerts   │
│   ┌─────────┐     │  ┌────────────────────┐  │  ┌─────────┐  │  P0: 0    │
│   │  0.82   │     │  │████████████████████│  │  │  2.3%   │  │  P1: 2   │
│   │  ▲ +3%  │     │  │██████████████████  │  │  │ ▼ OK    │  │  P2: 5   │
│   └─────────┘     │  │██████████████████  │  │  └─────────┘  │           │
│   Baseline: 0.80  │  └────────────────────┘  │               │           │
├───────────────────┼──────────────────────────┼───────────────┴───────────┤
│ Latency P95       │     Token Consumption    │   Queue Status            │
│  ┌─────────────┐  │  ┌────────────────────┐  │  ┌─────────────────────┐  │
│  │    2.3s     │  │  │ Input: 50K/hr     │  │  │   Queue Size: 12    │  │
│  │   ▼ OK      │  │  │ Output: 80K/hr    │  │  │   Capacity: 100     │  │
│  └─────────────┘  │  │ Cost Est: $130/day │  │  │   Usage: 12%        │  │
├───────────────────┼──────────────────────────┼───────────────────────────┤
│ User Satisfaction │    Safety Violations     │   Recent Events           │
│   ⭐⭐⭐⭐ 4.2     │  ┌────────────────────┐  │  - eval_run completed    │
│   ▲ +0.2         │  │ PII: 0.02/hr       │  │  - model_switch: gpt-4   │
│                  │  │ Harmful: 0.01/hr   │  │  - threshold_adjusted    │
│                  │  │ Bias: 0.03/hr      │  │                          │
└───────────────────┴──────────────────────────┴───────────────────────────┘

告警通知配置

# alertmanager.yml
global:
  resolve_timeout: 5m
  slack_api_url: 'https://hooks.slack.com/services/xxx'

route:
  group_by: ['severity', 'level']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: 'default'
  routes:
    - match:
        level: P0
      receiver: 'critical'
    - match:
        level: P1
      receiver: 'warning'
    - match:
        level: P2
      receiver: 'info'

receivers:
  - name: 'default'
    slack_configs:
      - channel: '#ai-alerts'
        send_resolved: true
  
  - name: 'critical'
    slack_configs:
      - channel: '#ai-critical'
        send_resolved: true
    email_configs:
      - to: 'oncall@company.com'
        send_resolved: true
    pagerduty_configs:
      - service_key: 'xxx'
        severity: critical
  
  - name: 'warning'
    slack_configs:
      - channel: '#ai-warnings'
        send_resolved: true
    email_configs:
      - to: 'team@company.com'
  
  - name: 'info'
    slack_configs:
      - channel: '#ai-info'

数据采集集成

# monitoring/integration.py
import asyncio
from datetime import datetime

class MonitoringIntegration:
    """
    监控系统集成层
    连接评估系统与 Prometheus/Grafana
    """
    
    def __init__(self, metrics_exporter, result_store):
        self.exporter = metrics_exporter
        self.store = result_store
        self.baseline_scores = {}
    
    async def sync_from_evaluation(self, eval_result: Dict):
        """
        从评估结果同步指标
        """
        # 更新评估得分
        for evaluator, score in eval_result.get('scores', {}).items():
            self.exporter.update_eval_score(evaluator, score)
        
        # 更新总体得分
        overall = eval_result.get('overall_score', 0)
        self.exporter.update_eval_score('overall', overall)
        
        # 更新基线(首次评估)
        if 'baseline' not in self.baseline_scores:
            self.baseline_scores['baseline'] = overall
            self.exporter.eval_score_baseline.labels(evaluator='overall').set(overall)
    
    async def sync_from_user_feedback(self, feedback: Dict):
        """
        从用户反馈同步指标
        """
        rating = feedback.get('rating')
        if rating:
            # 更新用户满意度
            self.exporter.update_user_satisfaction(rating)
        
        # 检查安全违规
        if feedback.get('safety_issue'):
            self.exporter.record_safety_violation(feedback['safety_issue'])
    
    async def start_realtime_sync(self, interval_seconds: int = 60):
        """
        启动实时同步
        """
        while True:
            try:
                # 获取最新评估结果
                latest_evals = self.store.list_evals(limit=1)
                if latest_evals:
                    eval_data = self.store.load(latest_evals[0]['eval_id'])
                    await self.sync_from_evaluation(eval_data['summary'])
                
                await asyncio.sleep(interval_seconds)
            except Exception as e:
                print(f"Sync error: {e}")
                await asyncio.sleep(interval_seconds)

小结

监控与持续优化的核心要点:

环节关键要点
数据采集全量记录,定期采样评估
指标体系三层架构:基础→技术→业务
告警系统分级、收敛、上下文完整
优化决策趋势分析、模式识别、A/B验证
仪表板一页概览、问题可钻取
实战落地Prometheus指标、Grafana配置、告警集成

闭环设计原则

  1. 监控→发现问题→优化→验证→监控
  2. 离线评估是基线,在线监控是追踪
  3. 告警要及时,但要避免风暴
  4. A/B测试是验证优化的标准方法
  5. 用户反馈是最真实的质量信号
  6. Prometheus + Grafana 是主流监控方案

下一部分,我们将提供附录参考材料。