post cover

技术热点落地:构建生产级 LLM Agent 监控系统(2026-05-22)


适用场景与目标

谁需要这套方案:

  • 已经在生产环境跑 LLM Agent 的团队(客服机器人、代码生成、数据分析 Agent 等)
  • 使用多个 LLM Provider(OpenAI、Anthropic、Google、DeepSeek 等)且模型调用成本难以控制
  • Agent 行为不可观测,出现幻觉、循环调用、无限重试时无法追溯
  • 需要构建合规审计日志的场景(金融、医疗、法律)

核心目标:

  1. 可观测:每个 Agent 的思考链、工具调用、Token 消耗、延迟全程可追溯
  2. 可路由:根据任务类型、负载、成本自动切换最优模型
  3. 可治理:异常行为(循环超过 N 次、超时、Token 爆炸)自动熔断
  4. 可审计:所有模型调用写入不可篡改的日志,供风控和合规使用

最小可行方案(MVP)步骤

环境准备

# Python 3.11+,推荐 venv 隔离
python3 -m venv agent-monitor && source agent-monitor/bin/activate

# 核心依赖
pip install \
  langchain langgraph \
  openinference-instrumentation \
  opentelemetry-api \
  opentelemetry-sdk \
  opentelemetry-exporter-otlp \
  ddtrace \
  structlog \
  prometheus-client \
  fastapi \
  uvicorn \
  redis \
  psycopg2-binary

架构概览(最小闭环)

┌─────────────────────────────────────────────────────────┐
│                     Agent Runtime                        │
│  ┌─────────┐    ┌──────────┐    ┌──────────────────┐   │
│  │ LangGraph│───▶│ Router   │───▶│ Model A/B/C...   │   │
│  │ /Agent  │    │(规则+LLM)│    └──────────────────┘   │
│  └────┬────┘    └────┬─────┘         │                  │
│       │              │               ▼                   │
│       ▼              ▼      ┌──────────────────┐       │
│  ┌────────────┐  ┌───────┐  │  OpenTelemetry    │       │
│  │ Trace ID   │  │Token  │  │  Span Collection  │       │
│  │ 传播       │  │计数   │  └────────┬─────────┘       │
│  └────────────┘  └───────┘           │                  │
└───────────────────────────────────────┼──────────────────┘

                        ┌────────────────────────────┐
                        │   OTEL Collector → Redis    │
                        │   → Prometheus → Grafana   │
                        └────────────────────────────┘

Step 1:引入统一 Trace Context

在 Agent 启动时注入 trace,将 LangGraph 的每次节点执行映射为 OpenTelemetry Span:

# monitor/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

def setup_tracing(service_name: str, otlp_endpoint: str = "http://localhost:4317"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)
    
    exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    
    return trace.get_tracer(__name__)

# 每个 Agent 调用时统一注入
tracer = setup_tracing("llm-agent-router")

Step 2:构建多模型路由器(规则 + LLM 判断混合)

# router/model_router.py
from enum import Enum
from dataclasses import dataclass
from typing import Callable
import structlog

log = structlog.get_logger()

class ModelTier(Enum):
    FAST  = "fast"    # GPT-4o-mini / Claude-3.5-Haiku
    BALANCED = "balanced"  # GPT-4o / Claude-3.7-Sonnet
    POWER = "power"   # GPT-5 / Claude-4-Sonnet / DeepSeek-R1

@dataclass
class ModelConfig:
    provider: str  # openai / anthropic / deepseek
    model: str
    max_tokens: int
    cost_per_1k_input: float
    cost_per_1k_output: float

MODEL_REGISTRY = {
    ModelTier.FAST: ModelConfig("openai", "gpt-4o-mini", 128000, 0.15, 0.60),
    ModelTier.BALANCED: ModelConfig("anthropic", "claude-3-7-sonnet-20250220", 200000, 3.0, 15.0),
    ModelTier.POWER: ModelConfig("deepseek", "deepseek-chat", 64000, 0.27, 1.1),
}

@dataclass
class RoutingContext:
    task_type: str        # classification / generation / reasoning / extraction
    estimated_input_tokens: int
    user_tier: str        # free / pro / enterprise
    latency_sla_ms: int
    cost_budget_usd: float

def route_model(ctx: RoutingContext) -> ModelConfig:
    # 规则层:先卡住硬约束
    if ctx.cost_budget_usd < 0.01:
        return MODEL_REGISTRY[ModelTier.FAST]
    
    if ctx.latency_sla_ms < 2000 and ctx.task_type == "classification":
        return MODEL_REGISTRY[ModelTier.FAST]
    
    # LLM 判断层:用轻量模型决定用哪个重型模型(Meta路由)
    if ctx.task_type == "reasoning" or ctx.estimated_input_tokens > 30000:
        return MODEL_REGISTRY[ModelTier.POWER]
    
    return MODEL_REGISTRY[ModelTier.BALANCED]

Step 3:Instrument LangChain / LangGraph,自动上报 Telemetry

# monitor/instrument.py
from langchain_core.globals import set_debug
from opentelemetry.instrumentation.langchain import LangchainInstrumentor

def init_instrumentation():
    # LangChain 自动埋点(_span for every LLM call)
    LangchainInstrumentor().instrument()
    
    # 额外手动埋点:记录 Agent 决策节点
    from opentelemetry import trace
    tracer = trace.get_tracer(__name__)
    
    return tracer

# 在 Agent 执行循环中埋入关键节点
with tracer.start_as_current_span("agent_decision") as span:
    span.set_attribute("agent.loop_count", loop_count)
    span.set_attribute("agent.total_tokens", total_tokens)
    span.set_attribute("agent.last_tool", last_tool_name)

Step 4:Token 消费追踪与成本告警

# monitor/cost_tracker.py
from dataclasses import dataclass, field
from typing import Dict
import structlog
from datetime import datetime, timedelta

log = structlog.get_logger()

@dataclass
class CostBudget:
    daily_limit_usd: float
    monthly_limit_usd: float
    alert_threshold: float = 0.8

class TokenCounter:
    def __init__(self, budget: CostBudget):
        self.budget = budget
        self.daily_spend: Dict[str, float] = {}
        self.monthly_spend: Dict[str, float] = {}
    
    def record(self, provider: str, model: str, input_tokens: int, output_tokens: int, cost_usd: float):
        key = f"{provider}:{model}"
        self.daily_spend[key] = self.daily_spend.get(key, 0.0) + cost_usd
        self.monthly_spend[key] = self.monthly_spend.get(key, 0.0) + cost_usd
        
        # 超预算熔断
        total_daily = sum(self.daily_spend.values())
        if total_daily >= self.budget.daily_limit_usd:
            log.warning("daily_budget_exceeded", 
                       spend=total_daily, 
                       limit=self.budget.daily_limit_usd,
                       action="circuit_break")
            raise BudgetExceededError(f"Daily budget {self.budget.daily_limit_usd} exceeded")
        
        # 告警
        if total_daily >= self.budget.daily_limit_usd * self.budget.alert_threshold:
            log.warning("budget_alert", 
                       spend=total_daily, 
                       threshold=self.budget.alert_threshold)
            # 触发 PagerDuty / Slack 通知

Step 5:Agent 循环检测与熔断

# monitor/circuit_breaker.py
import time
from typing import Optional

class AgentCircuitBreaker:
    def __init__(
        self,
        max_loops: int = 10,
        max_time_seconds: float = 120.0,
        tool_call_limit: int = 50,
    ):
        self.max_loops = max_loops
        self.max_time_seconds = max_time_seconds
        self.tool_call_limit = tool_call_limit
        
        self.loop_count = 0
        self.tool_calls = 0
        self.start_time: Optional[float] = None
    
    def tick(self, tool_name: Optional[str] = None):
        if self.start_time is None:
            self.start_time = time.time()
        
        self.loop_count += 1
        if tool_name:
            self.tool_calls += 1
        
        elapsed = time.time() - self.start_time
        
        reasons = []
        if self.loop_count > self.max_loops:
            reasons.append(f"loop_exceeded({self.loop_count}/{self.max_loops})")
        if elapsed > self.max_time_seconds:
            reasons.append(f"time_exceeded({elapsed:.1f}s/{self.max_time_seconds}s)")
        if self.tool_calls > self.tool_call_limit:
            reasons.append(f"tool_calls_exceeded({self.tool_calls}/{self.tool_call_limit})")
        
        if reasons:
            raise AgentCircuitBroken(f"Agent halted: {', '.join(reasons)}")
    
    def reset(self):
        self.loop_count = 0
        self.tool_calls = 0
        self.start_time = None

class AgentCircuitBroken(Exception):
    pass

Step 6:Grafana Dashboard 快速部署

# docker-compose.yml(最小可运行)
version: '3.8'
services:
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.112.0
    ports:
      - "4317:4317"   # OTLP gRPC
      - "4318:4318"   # OTLP HTTP
    volumes:
      - ./otel-collector.yaml:/etc/otelcol-contrib/config.yaml:ro

  prometheus:
    image: prom/prometheus:v2.54.0
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro

  grafana:
    image: grafana/grafana:11.3.0
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:ro

关键实现细节

Span 属性规范(保证查询效率)

# 每个 span 必须包含这些属性,便于 Prometheus / Grafana 筛选
span.set_attribute("provider", "openai")           #  Provider 分类
span.set_attribute("model", "gpt-4o-mini")        # 模型名
span.set_attribute("task_type", "extraction")     # 任务类型
span.set_attribute("user_tier", "pro")            # 用户等级(决定路由策略)
span.set_attribute("token.total", input_tokens + output_tokens)
span.set_attribute("cost.usd", cost_usd)           # 本次调用成本
span.set_attribute("latency.ms", elapsed_ms)       # 延迟
span.set_attribute("agent.loop", loop_count)       # Agent 循环次数
span.set_attribute("agent.tool", last_tool)        # 最后一次工具名

审计日志不可篡改方案

# monitor/audit_logger.py
import hashlib
import json
from datetime import datetime

class ImmutableAuditLog:
    """
    每个日志条目带上前一条的 hash,形成链式结构。
    任何人篡改历史记录都会导致 hash 链断裂,可检测。
    """
    def __init__(self, db_connection):
        self.conn = db_connection
    
    def append(self, event_type: str, payload: dict, prev_hash: str):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "payload": payload,
            "prev_hash": prev_hash,
        }
        entry["hash"] = self._compute_hash(entry)
        
        # 写入 PostgreSQL(时间戳+索引,便于查询)
        self.conn.execute("""
            INSERT INTO audit_log (timestamp, event_type, payload, prev_hash, hash)
            VALUES (%s, %s, %s, %s, %s)
        """, (entry["timestamp"], entry["event_type"], 
              json.dumps(entry["payload"]), entry["prev_hash"], entry["hash"]))
        
        return entry["hash"]
    
    def _compute_hash(self, entry: dict) -> str:
        content = json.dumps(entry, sort_keys=True, default=str)
        return hashlib.sha256(content.encode()).hexdigest()
    
    def verify_chain(self) -> bool:
        """定期验证 hash 链完整性"""
        rows = self.conn.fetch_all("SELECT * FROM audit_log ORDER BY timestamp")
        prev_hash = "genesis"
        for row in rows:
            computed = self._compute_hash(json.loads(row["payload"]))
            if row["hash"] != computed or row["prev_hash"] != prev_hash:
                return False
            prev_hash = row["hash"]
        return True

常见坑与规避清单

坑 1:Token 统计误差导致成本失控

问题:Provider 返回的 usage 数字与实际计费存在延迟/重试导致的重复计费。

规避

  • 在调用层做去重:相同 request_id 的调用不重复计费
  • 使用 Provider 的 Webhook 或回调获取准确计费,不要依赖本地统计
# 幂等调用包装
from cachetools import TTLCache

idempotency_cache = TTLCache(maxsize=10000, ttl=300)

def call_with_idempotency(request_id: str, fn: Callable):
    if request_id in idempotency_cache:
        return idempotency_cache[request_id]
    result = fn()
    idempotency_cache[request_id] = result
    return result

坑 2:LangChainInstrumentor 重复初始化导致 Span 丢失

问题LangchainInstrumentor().instrument() 多次调用会产生多个 processor,每个 event 被发送多次,量级翻倍。

规避:在应用入口做单次初始化,用 flag 保证幂等。

_instrumented = False
def ensure_instrumented():
    global _instrumented
    if not _instrumented:
        LangchainInstrumentor().instrument()
        _instrumented = True

坑 3:Agent 循环超时未被正确捕获

问题:LangGraph 的 while loop 内抛出的异常,在 stream 模式下可能无法传播到外层,导致 Agent 静默卡死。

规避:在每个 node 入口埋入 timeout check:

def monitored_node(state: AgentState, config: dict):
    deadline = state.get("_deadline", time.time() + 60)
    if time.time() > deadline:
        raise AgentTimeout(f"Node exceeded deadline")
    # 正常执行业务逻辑
    return result

坑 4:OTEL Collector 单点故障导致 Agent 阻塞

问题:OTLP exporter 默认是同步的,Collector 不可用时会 block Agent 执行。

规避:使用 BatchSpanProcessor + 超时配置,且 span 发送失败不能影响业务。

exporter = OTLPSpanExporter(
    endpoint=os.getenv("OTEL_ENDPOINT", "http://localhost:4317"),
    timeout_seconds=2,  # 超时快速失败,不 block
)

坑 5:多模型切换后 Context 窗口混乱

问题:从 Claude 切换到 GPT-4o 时,上下文格式不兼容(Anthropic 用 system messages 方式不同)。

规避:在 Router 层统一做 prompt 格式转换,不要让业务层感知 Provider 差异。

def normalize_prompt_for_model(prompt: str, target_provider: str) -> str:
    # 移除对特定 Provider 的引用,统一为通用格式
    return prompt.replace("<examples>", "## Examples\n")

成本/性能/维护权衡

存储成本估算

数据类型单条大小每日量级(1万次调用/天)月存储(30天)
Span 元数据~500B5MB150MB
Audit Log~2KB20MB600MB
Prometheus TSDB压缩后 ~100B1MB30MB
总计~26MB/天~780MB/月

结论:最小化部署用 PostgreSQL + Redis 足够。量级超过 100 万次/天建议上 ClickHouse 或 TimescaleDB。

延迟开销

  • Telemetry 引入的额外延迟:< 5ms(异步 span 发送,不阻塞主流程)
  • Router LLM 判断:约 50-150ms(可异步预判,不阻塞调用)
  • 审计日志写入:同步写约 2-5ms,建议异步落盘

维护成本

组件维护难度建议
OTEL Collector用官方 Helm Chart,托管 K8s
Prometheus/GrafanaGedu 即可,官方模板
Audit Log 链每月一次 verify_chain() 巡检
模型路由规则随业务迭代,每季度 review 一次路由策略

一周内可执行行动清单

Day 1-2:快速止血(30分钟上手)

  • 在现有 LangChain Agent 中接入 LangchainInstrumentor(5行代码)
  • 部署 OTEL Collector(docker-compose 1文件)
  • 验证第一个 Span 在 Grafana 中可见

Day 3-4:成本可见化

  • 在每次 LLM 调用后记录 usage + cost_usd 到 Redis
  • 写一个 Prometheus exporter 暴露 llm_cost_daily_total 指标
  • 配置 Grafana 面板:日/周/月 Token 消耗曲线
  • 设置超过 $50/天时 Slack 告警

Day 5-6:Agent 稳定性

  • 接入 AgentCircuitBreaker(循环上限 + 超时熔断)
  • 验证循环超过 10 次时 Agent 被正确中断
  • 审计日志写入 PostgreSQL,链完整性验证通过

Day 7:路由与切换

  • 部署 ModelRouter,先用规则层(不引入 LLM 判断)
  • 对比同任务下 Fast/Balanced/Power 模型的 QPS 与成本
  • 将路由决策日志也写入审计表,用于后续优化

参考资源