Phase 3: Agent Architecture & Observability (Level 3)
Cycle: Weeks 6-8 Core Goal: Master Intelligent Decision & Routing, Establish Agent Behavior Observability and Auditing Capability
Prerequisite Capabilitiesβ
- β Mastered RAG App Development (Level 2)
- β Implemented Java-Python Heterogeneous System Communication
- β Established Evaluation System
Why this phase is neededβ
RAG solved "Knowledge Acquisition" problem, but complex apps need AI to be able to Decide Autonomously: When to retrieve, what to retrieve, how to combine multiple data sources. Agent architecture upgrades AI from "Passive Answer" to "Active Planning".
β Core Capability 1: LangGraph State Graph Programmingβ
β Core Conceptsβ
- β StateGraph: Define Agent's state and transitions
- β Conditional Edge: Decide next step based on condition
- Checkpointer: Persist Agent's memory
β Practice 1: Router Agent (Routing Layer)β
Goal: Agent automatically decides which data source to query based on user question
from langgraph.graph import StateGraph, END
# Define State
class AgentState(TypedDict):
question: str
route: str
result: str
# Route Decision Function
def route_question(state: AgentState) -> str:
question = state["question"]
# Use LLM to judge question type
decision = llm.invoke(f"Which data source should be queried for this question? Question: {question}")
if "document" in decision:
return "milvus"
elif "relation" in decision:
return "neo4j"
else:
return "google"
# Build State Graph
workflow = StateGraph(AgentState)
workflow.add_node("router", route_question)
workflow.add_node("milvus_search", lambda s: milvus_client.search(s["question"]))
workflow.add_node("neo4j_search", lambda s: neo4j_client.query(s["question"]))
workflow.add_node("google_search", lambda s: google_search(s["question"]))
# Add Conditional Edges
workflow.add_conditional_edges(
"router",
lambda s: s["route"],
{
"milvus": "milvus_search",
"neo4j": "neo4j_search",
"google": "google_search"
}
)
workflow.set_entry_point("router")
app = workflow.compile()
Practice 2: Autonomous Agent (Report Generator)β
Process: Auto Plan Outline β Segmented Search Materials β Integrated Writing β Self-Reflection Modification
# Report Generation Agent
class ReportState(TypedDict):
topic: str
outline: list[str]
sections: dict[str, str]
final_report: str
def plan_outline(state: ReportState) -> ReportState:
outline = llm.invoke(f"Generate report outline for topic '{state['topic']}'")
state["outline"] = outline.split("\n")
return state
def research_section(state: ReportState, section: str) -> str:
materials = rag_search(section)
content = llm.invoke(f"Write based on following materials: {materials}")
return content
def self_reflect(state: ReportState) -> ReportState:
critique = llm.invoke(f"Assess quality of this report: {state['final_report']}")
if "needs improvement" in critique:
# Regenerate
state = plan_outline(state)
return state
Core Capability 2: Knowledge Graph (GraphRAG)β
Neo4j Practice: Store Organization Structure Data into Neo4j
// Create Nodes and Relationships
CREATE (ZhangSan:Employee {name: "Zhang San", title: "Engineer"})
CREATE (LiSi:Employee {name: "Li Si", title: "Manager"})
CREATE (WangWu:Employee {name: "Wang Wu", title: "Director"})
CREATE (ZhangSan)-[:REPORTS_TO]->(LiSi)
CREATE (LiSi)-[:REPORTS_TO]->(WangWu)
// Query: Who is Zhang San's boss's boss?
MATCH (ZhangSan:Employee {name: "Zhang San"})-[:REPORTS_TO*2]->(boss)
RETURN boss.name
GraphRAG: Solve "Global Problems"
- Traditional RAG: Can only answer "Local" questions (Within single document)
- β GraphRAG: Can answer "Global" questions (Across relationships of multiple entities)
β Core Capability 3: LLM Observabilityβ
Core Pain Pointβ
Traditional APM (like SkyWalking) cannot monitor AI answer quality
Architecture Designβ
Integrate β LangSmith or Arize Phoenix
Practical Scenario: Record every user question and AI answer
from langsmith import Client
langsmith_client = Client()
@langsmith_client.trace
def rag_query(question: str) -> str:
# 1. Retrieve
docs = milvus_client.search(question)
# 2. Generate Answer
answer = llm.invoke(f"Answer based on following documents: {docs}\n\nQuestion: {question}")
# LangSmith automatically records:
# - Latency
# - Token Consumption
# - Input Output
# - Intermediate Steps
return answer
Monitoring Metrics:
- Latency (P50, P95, P99)
- β Token Consumption (Input tokens, Output tokens, Total cost)
- Answer Accuracy Score (Auto evaluated via Ragas)
β Core Capability 4: Agent Audit Logβ
Core Principleβ
AI is non-deterministic, every decision must be recorded for compliance auditing
β Log Recording Standardβ
Fields Must Include:
- Timestamp (ISO 8601 Format)
- β User ID and Session ID
- β Agent Decision Type (Route Selection, Tool Call, Final Answer)
- Input Context (User question, retrieved documents)
- β Output Result (Agent's decision and reason)
- Token Consumption and Latency
Storage Scheme:
- Structured Log: Store in Elasticsearch using JSON format
- β Relational Storage: Store key decisions in MySQL
agent_audit_logtable
Code Example:
import json
from datetime import datetime
import time
def log_agent_decision(
user_id: str,
session_id: str,
decision_type: str,
input_context: dict,
output: dict,
tokens: int,
latency_ms: int
):
"""
Record Agent Decision Audit Log
Args:
user_id: User ID
session_id: Session ID (For tracing complete conversation)
decision_type: Decision Type (route_selection/tool_call/final_answer)
input_context: Input Context (Including question, retrieved docs etc.)
output: Output Result (Agent's decision and reason)
tokens: Token Consumption
latency_ms: Latency (ms)
"""
audit_log = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"session_id": session_id,
"decision_type": decision_type,
"input": input_context,
"output": output,
"tokens": tokens,
"latency_ms": latency_ms
}
# 1. Store to Elasticsearch (For full text search and analysis)
es_client.index(index="agent_audit_log", document=audit_log)
# 2. Store key decisions to MySQL (For compliance audit)
if decision_type in ["route_selection", "final_answer"]:
db.execute(
"INSERT INTO agent_audit_log (timestamp, user_id, session_id, data) VALUES (?, ?, ?, ?)",
(audit_log["timestamp"], user_id, session_id, json.dumps(audit_log))
)
# 3. If latency > 2s, record warning
if latency_ms > 2000:
logger.warning(f"High latency detected: {latency_ms}ms for session {session_id}")
# Usage Example: Integrate audit log in Router Agent
@agent_wrapper
def router_agent(question: str, user_id: str, session_id: str):
start_time = time.time()
# Execute routing decision
route = decide_route(question)
latency_ms = int((time.time() - start_time) * 1000)
# Record audit log
log_agent_decision(
user_id=user_id,
session_id=session_id,
decision_type="route_selection",
input_context={"question": question},
output={"route": route, "reason": "Question involves document retrieval"},
tokens=150,
latency_ms=latency_ms
)
return route
Audit Log Query Example (For compliance audit):
# Query complete decision chain of a session
def get_session_audit_trail(session_id: str):
"""Trace complete conversation via session_id"""
results = db.execute(
"SELECT * FROM agent_audit_log WHERE session_id = ? ORDER BY timestamp",
(session_id,)
)
return [json.loads(row['data']) for row in results]
# Query high latency decisions (For performance optimization)
def get_high_latency_decisions(threshold_ms: int = 2000):
"""Find all decisions with latency exceeding threshold"""
query = {
"query": {
"range": {
"latency_ms": {"gte": threshold_ms}
}
}
}
return es_client.search(index="agent_audit_log", body=query)
Core Capability 5: FastAPI Service Encapsulationβ
Goal: Encapsulate LangGraph Agent as RESTful API for Java to call
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class AgentRequest(BaseModel):
question: str
user_id: str
session_id: str
class AgentResponse(BaseModel):
answer: str
sources: list[str]
confidence: float
trace_id: str
@app.post("/agent/query", response_model=AgentResponse)
async def agent_query(request: AgentRequest):
# Run Agent
result = agent_app.invoke({
"question": request.question,
"user_id": request.user_id,
"session_id": request.session_id
})
return AgentResponse(
answer=result["answer"],
sources=result["sources"],
confidence=result["confidence"],
trace_id=result["trace_id"]
)
Phase Output Standardsβ
Deliverables Must Complete (As prerequisites for entering Level 4):
Agent Application Layer:
- β Implement at least 1 Router Agent, able to smartly select from 3 data sources (Milvus/Neo4j/Google), accuracy > 80%
- Implement at least 1 Autonomous Decision Agent (e.g. Report Generator or Multi-step Reasoning Agent), able to complete full task flow
Service Encapsulation Layer:
- Establish FastAPI service encapsulating Agent, supporting RESTful API call
- Implement Java-Python standardized communication protocol (Define interface using OpenAPI, Java side uses Feign Client)
Knowledge Graph Layer:
- Complete Neo4j Knowledge Graph practice, containing at least 50 nodes and relationships, able to answer multi-hop query questions
Observability Layer:
- β Integrate LLM Observability Tool (LangSmith or Arize Phoenix), able to view Trace link and Token consumption
- β Implement complete Agent Audit Log System, containing all required fields, able to trace complete conversation via session_id
Capability Verification:
- β Able to explain three core concepts of LangGraph: StateGraph, Conditional Edge, Checkpointer
- Able to design and implement a complete Agent Audit Log System, conforming to requirements of heavily regulated industries like finance, medical
Time Checkpoint: If not completed after 3 weeks, suggest implementing simple Router Agent first, then gradually add complex features
Roadmap Optimization Suggestionsβ
Practical Project: Implement Audit Log for Agent actions, recording every decision for compliance, ensuring availability in heavily regulated industries like finance, medical
Previous Phase: Level 2 - RAG App Development & Heterogeneous System Architecture
Next Phase: Level 4 - Full Stack Landing & Production Optimization