跳到主要内容

阶段三: Agent 架构与可观测性 (Level 3)

周期: 第 6-8 周
核心目标: 掌握智能决策与路由,建立 Agent 行为的可观测性和审计能力

前置能力要求

  • ✅ 已掌握 RAG 应用开发 (Level 2)
  • ✅ 已实现 Java-Python 异构系统通信
  • ✅ 已建立评估体系

为什么需要这个阶段

RAG 解决了"知识获取"问题,但复杂应用需要 AI 能够自主决策: 何时检索、检索什么、如何组合多个数据源。Agent 架构让 AI 从"被动回答"升级为"主动规划"。

⭐ 核心能力 1: LangGraph 状态图编程

⭐ 核心概念

  • StateGraph: 定义 Agent 的状态和转换
  • Conditional Edge: 根据条件决定下一步
  • Checkpointer: 持久化 Agent 的记忆

⭐ 实战一: Router Agent (路由层)

目标: Agent 根据用户问题自动决定查询哪个数据源

from langgraph.graph import StateGraph, END

# 定义状态
class AgentState(TypedDict):
question: str
route: str
result: str

# 路由决策函数
def route_question(state: AgentState) -> str:
question = state["question"]
# 使用 LLM 判断问题类型
decision = llm.invoke(f"这个问题应该查询哪个数据源? 问题: {question}")

if "文档" in decision:
return "milvus"
elif "关系" in decision:
return "neo4j"
else:
return "google"

# 构建状态图
workflow = StateGraph(AgentState)
workflow.add_node("router", route_question)
workflow.add_node("milvus_search", lambda s: milvus_client.search(s["question"]))
workflow.add_node("neo4j_search", lambda s: neo4j_client.query(s["question"]))
workflow.add_node("google_search", lambda s: google_search(s["question"]))

# 添加条件边
workflow.add_conditional_edges(
"router",
lambda s: s["route"],
{
"milvus": "milvus_search",
"neo4j": "neo4j_search",
"google": "google_search"
}
)

workflow.set_entry_point("router")
app = workflow.compile()

实战二: 自主 Agent (研报生成器)

流程: 自动规划大纲 → 分段搜索素材 → 整合写作 → 自我反思修改

# 研报生成 Agent
class ReportState(TypedDict):
topic: str
outline: list[str]
sections: dict[str, str]
final_report: str

def plan_outline(state: ReportState) -> ReportState:
outline = llm.invoke(f"为主题 '{state['topic']}' 生成研报大纲")
state["outline"] = outline.split("\n")
return state

def research_section(state: ReportState, section: str) -> str:
materials = rag_search(section)
content = llm.invoke(f"基于以下材料写作: {materials}")
return content

def self_reflect(state: ReportState) -> ReportState:
critique = llm.invoke(f"评估这份研报的质量: {state['final_report']}")
if "需要改进" in critique:
# 重新生成
state = plan_outline(state)
return state

核心能力 2: 知识图谱 (GraphRAG)

Neo4j 实战: 将组织架构数据存入 Neo4j

// 创建节点和关系
CREATE (张三:Employee {name: "张三", title: "工程师"})
CREATE (李四:Employee {name: "李四", title: "经理"})
CREATE (王五:Employee {name: "王五", title: "总监"})

CREATE (张三)-[:REPORTS_TO]->(李四)
CREATE (李四)-[:REPORTS_TO]->(王五)

// 查询: 谁是张三的老板的老板?
MATCH (张三:Employee {name: "张三"})-[:REPORTS_TO*2]->(boss)
RETURN boss.name

GraphRAG: 解决"全局性问题"

  • 传统 RAG: 只能回答"局部"问题 (单个文档内)
  • ⭐ GraphRAG: 可以回答"全局"问题 (跨多个实体的关系)

⭐ 核心能力 3: LLM 可观测性

核心痛点

传统 APM (如 SkyWalking) 监控不了 AI 的回答质量

架构设计

集成 ⭐ LangSmithArize Phoenix

实战场景: 记录每次用户提问和 AI 回答

from langsmith import Client

langsmith_client = Client()

@langsmith_client.trace
def rag_query(question: str) -> str:
# 1. 检索
docs = milvus_client.search(question)

# 2. 生成回答
answer = llm.invoke(f"基于以下文档回答: {docs}\n\n问题: {question}")

# LangSmith 自动记录:
# - 延迟
# - Token 消耗
# - 输入输出
# - 中间步骤

return answer

监控指标:

  • 延迟 (P50, P95, P99)
  • ⭐ Token 消耗 (输入 tokens, 输出 tokens, 总成本)
  • 回答的准确性评分 (通过 Ragas 自动评估)

⭐ 核心能力 4: Agent 审计日志

核心原则

AI 是非确定性的,必须记录每个决策用于合规性审计

⭐ 日志记录标准

必须包含的字段:

  • 时间戳 (ISO 8601 格式)
  • ⭐ 用户 ID 和会话 ID
  • ⭐ Agent 决策类型 (路由选择、工具调用、最终回答)
  • 输入上下文 (用户问题、检索到的文档)
  • ⭐ 输出结果 (Agent 的决策和理由)
  • Token 消耗和延迟

存储方案:

  • 结构化日志: 使用 JSON 格式存储到 Elasticsearch
  • ⭐ 关系型存储: 关键决策存入 MySQL 的 agent_audit_log

代码示例:

import json
from datetime import datetime
import time

def log_agent_decision(
user_id: str,
session_id: str,
decision_type: str,
input_context: dict,
output: dict,
tokens: int,
latency_ms: int
):
"""
记录 Agent 决策审计日志

Args:
user_id: 用户 ID
session_id: 会话 ID (用于追溯完整对话)
decision_type: 决策类型 (route_selection/tool_call/final_answer)
input_context: 输入上下文 (包含问题、检索文档等)
output: 输出结果 (Agent 的决策和理由)
tokens: Token 消耗
latency_ms: 延迟 (毫秒)
"""
audit_log = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"session_id": session_id,
"decision_type": decision_type,
"input": input_context,
"output": output,
"tokens": tokens,
"latency_ms": latency_ms
}

# 1. 存储到 Elasticsearch (用于全文搜索和分析)
es_client.index(index="agent_audit_log", document=audit_log)

# 2. 关键决策存入 MySQL (用于合规审计)
if decision_type in ["route_selection", "final_answer"]:
db.execute(
"INSERT INTO agent_audit_log (timestamp, user_id, session_id, data) VALUES (?, ?, ?, ?)",
(audit_log["timestamp"], user_id, session_id, json.dumps(audit_log))
)

# 3. 如果延迟超过 2秒,记录告警
if latency_ms > 2000:
logger.warning(f"High latency detected: {latency_ms}ms for session {session_id}")

# 使用示例: 在 Router Agent 中集成审计日志
@agent_wrapper
def router_agent(question: str, user_id: str, session_id: str):
start_time = time.time()

# 执行路由决策
route = decide_route(question)

latency_ms = int((time.time() - start_time) * 1000)

# 记录审计日志
log_agent_decision(
user_id=user_id,
session_id=session_id,
decision_type="route_selection",
input_context={"question": question},
output={"route": route, "reason": "问题涉及文档检索"},
tokens=150,
latency_ms=latency_ms
)

return route

审计日志查询示例 (用于合规审计):

# 查询某个会话的完整决策链路
def get_session_audit_trail(session_id: str):
"""通过 session_id 追溯完整对话"""
results = db.execute(
"SELECT * FROM agent_audit_log WHERE session_id = ? ORDER BY timestamp",
(session_id,)
)
return [json.loads(row['data']) for row in results]

# 查询高延迟决策 (用于性能优化)
def get_high_latency_decisions(threshold_ms: int = 2000):
"""查找所有延迟超过阈值的决策"""
query = {
"query": {
"range": {
"latency_ms": {"gte": threshold_ms}
}
}
}
return es_client.search(index="agent_audit_log", body=query)

核心能力 5: FastAPI 服务封装

目标: 将 LangGraph Agent 封装为 RESTful API,供 Java 调用

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class AgentRequest(BaseModel):
question: str
user_id: str
session_id: str

class AgentResponse(BaseModel):
answer: str
sources: list[str]
confidence: float
trace_id: str

@app.post("/agent/query", response_model=AgentResponse)
async def agent_query(request: AgentRequest):
# 运行 Agent
result = agent_app.invoke({
"question": request.question,
"user_id": request.user_id,
"session_id": request.session_id
})

return AgentResponse(
answer=result["answer"],
sources=result["sources"],
confidence=result["confidence"],
trace_id=result["trace_id"]
)

阶段产出标准

必须完成的交付物 (作为进入 Level 4 的前置条件):

Agent 应用层:

  • ⭐ 实现至少 1 个 Router Agent,能够在 3 个数据源中智能选择 (Milvus/Neo4j/Google),准确率 > 80%
  • 实现至少 1 个自主决策 Agent (如研报生成器或多步推理 Agent),能够完成完整任务流程

服务封装层:

  • 建立 FastAPI 服务封装 Agent,支持 RESTful API 调用
  • 实现 Java-Python 的标准化通信协议 (使用 OpenAPI 定义接口,Java 端使用 Feign Client)

知识图谱层:

  • 完成 Neo4j 知识图谱实战,至少包含 50 个节点和关系,能够回答多跳查询问题

可观测性层:

  • ⭐ 集成 LLM 可观测性工具 (LangSmith 或 Arize Phoenix),能够查看 Trace 链路和 Token 消耗
  • ⭐ 实现完整的 Agent 审计日志系统,包含所有必需字段,可通过 session_id 追溯完整对话

能力验证:

  • ⭐ 能够解释 LangGraph 的 StateGraph 、Conditional Edge 、Checkpointer 三个核心概念
  • 能够设计并实现一个完整的 Agent 审计日志系统,符合金融、医疗等强监管行业要求

时间检查点: 如果超过 3 周仍未完成,建议先实现简单的 Router Agent,再逐步添加复杂功能

路线图优化建议

实战项目: 实现 Audit Log for Agent actions,记录每个决策用于合规,确保在金融、医疗等强监管行业可用


上一阶段: Level 2 - RAG 应用开发与异构系统架构

下一阶段: Level 4 - 全栈落地与生产优化