Skip to main content

Phase 3: Agent Architecture & Observability (Level 3)

Cycle: Weeks 6-8 Core Goal: Master Intelligent Decision & Routing, Establish Agent Behavior Observability and Auditing Capability

Prerequisite Capabilities​

  • βœ… Mastered RAG App Development (Level 2)
  • βœ… Implemented Java-Python Heterogeneous System Communication
  • βœ… Established Evaluation System

Why this phase is needed​

RAG solved "Knowledge Acquisition" problem, but complex apps need AI to be able to Decide Autonomously: When to retrieve, what to retrieve, how to combine multiple data sources. Agent architecture upgrades AI from "Passive Answer" to "Active Planning".

⭐ Core Capability 1: LangGraph State Graph Programming​

⭐ Core Concepts​

  • ⭐ StateGraph: Define Agent's state and transitions
  • ⭐ Conditional Edge: Decide next step based on condition
  • Checkpointer: Persist Agent's memory

⭐ Practice 1: Router Agent (Routing Layer)​

Goal: Agent automatically decides which data source to query based on user question

from langgraph.graph import StateGraph, END

# Define State
class AgentState(TypedDict):
question: str
route: str
result: str

# Route Decision Function
def route_question(state: AgentState) -> str:
question = state["question"]
# Use LLM to judge question type
decision = llm.invoke(f"Which data source should be queried for this question? Question: {question}")

if "document" in decision:
return "milvus"
elif "relation" in decision:
return "neo4j"
else:
return "google"

# Build State Graph
workflow = StateGraph(AgentState)
workflow.add_node("router", route_question)
workflow.add_node("milvus_search", lambda s: milvus_client.search(s["question"]))
workflow.add_node("neo4j_search", lambda s: neo4j_client.query(s["question"]))
workflow.add_node("google_search", lambda s: google_search(s["question"]))

# Add Conditional Edges
workflow.add_conditional_edges(
"router",
lambda s: s["route"],
{
"milvus": "milvus_search",
"neo4j": "neo4j_search",
"google": "google_search"
}
)

workflow.set_entry_point("router")
app = workflow.compile()

Practice 2: Autonomous Agent (Report Generator)​

Process: Auto Plan Outline β†’ Segmented Search Materials β†’ Integrated Writing β†’ Self-Reflection Modification

# Report Generation Agent
class ReportState(TypedDict):
topic: str
outline: list[str]
sections: dict[str, str]
final_report: str

def plan_outline(state: ReportState) -> ReportState:
outline = llm.invoke(f"Generate report outline for topic '{state['topic']}'")
state["outline"] = outline.split("\n")
return state

def research_section(state: ReportState, section: str) -> str:
materials = rag_search(section)
content = llm.invoke(f"Write based on following materials: {materials}")
return content

def self_reflect(state: ReportState) -> ReportState:
critique = llm.invoke(f"Assess quality of this report: {state['final_report']}")
if "needs improvement" in critique:
# Regenerate
state = plan_outline(state)
return state

Core Capability 2: Knowledge Graph (GraphRAG)​

Neo4j Practice: Store Organization Structure Data into Neo4j

// Create Nodes and Relationships
CREATE (ZhangSan:Employee {name: "Zhang San", title: "Engineer"})
CREATE (LiSi:Employee {name: "Li Si", title: "Manager"})
CREATE (WangWu:Employee {name: "Wang Wu", title: "Director"})

CREATE (ZhangSan)-[:REPORTS_TO]->(LiSi)
CREATE (LiSi)-[:REPORTS_TO]->(WangWu)

// Query: Who is Zhang San's boss's boss?
MATCH (ZhangSan:Employee {name: "Zhang San"})-[:REPORTS_TO*2]->(boss)
RETURN boss.name

GraphRAG: Solve "Global Problems"

  • Traditional RAG: Can only answer "Local" questions (Within single document)
  • ⭐ GraphRAG: Can answer "Global" questions (Across relationships of multiple entities)

⭐ Core Capability 3: LLM Observability​

Core Pain Point​

Traditional APM (like SkyWalking) cannot monitor AI answer quality

Architecture Design​

Integrate ⭐ LangSmith or Arize Phoenix

Practical Scenario: Record every user question and AI answer

from langsmith import Client

langsmith_client = Client()

@langsmith_client.trace
def rag_query(question: str) -> str:
# 1. Retrieve
docs = milvus_client.search(question)

# 2. Generate Answer
answer = llm.invoke(f"Answer based on following documents: {docs}\n\nQuestion: {question}")

# LangSmith automatically records:
# - Latency
# - Token Consumption
# - Input Output
# - Intermediate Steps

return answer

Monitoring Metrics:

  • Latency (P50, P95, P99)
  • ⭐ Token Consumption (Input tokens, Output tokens, Total cost)
  • Answer Accuracy Score (Auto evaluated via Ragas)

⭐ Core Capability 4: Agent Audit Log​

Core Principle​

AI is non-deterministic, every decision must be recorded for compliance auditing

⭐ Log Recording Standard​

Fields Must Include:

  • Timestamp (ISO 8601 Format)
  • ⭐ User ID and Session ID
  • ⭐ Agent Decision Type (Route Selection, Tool Call, Final Answer)
  • Input Context (User question, retrieved documents)
  • ⭐ Output Result (Agent's decision and reason)
  • Token Consumption and Latency

Storage Scheme:

  • Structured Log: Store in Elasticsearch using JSON format
  • ⭐ Relational Storage: Store key decisions in MySQL agent_audit_log table

Code Example:

import json
from datetime import datetime
import time

def log_agent_decision(
user_id: str,
session_id: str,
decision_type: str,
input_context: dict,
output: dict,
tokens: int,
latency_ms: int
):
"""
Record Agent Decision Audit Log

Args:
user_id: User ID
session_id: Session ID (For tracing complete conversation)
decision_type: Decision Type (route_selection/tool_call/final_answer)
input_context: Input Context (Including question, retrieved docs etc.)
output: Output Result (Agent's decision and reason)
tokens: Token Consumption
latency_ms: Latency (ms)
"""
audit_log = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"session_id": session_id,
"decision_type": decision_type,
"input": input_context,
"output": output,
"tokens": tokens,
"latency_ms": latency_ms
}

# 1. Store to Elasticsearch (For full text search and analysis)
es_client.index(index="agent_audit_log", document=audit_log)

# 2. Store key decisions to MySQL (For compliance audit)
if decision_type in ["route_selection", "final_answer"]:
db.execute(
"INSERT INTO agent_audit_log (timestamp, user_id, session_id, data) VALUES (?, ?, ?, ?)",
(audit_log["timestamp"], user_id, session_id, json.dumps(audit_log))
)

# 3. If latency > 2s, record warning
if latency_ms > 2000:
logger.warning(f"High latency detected: {latency_ms}ms for session {session_id}")

# Usage Example: Integrate audit log in Router Agent
@agent_wrapper
def router_agent(question: str, user_id: str, session_id: str):
start_time = time.time()

# Execute routing decision
route = decide_route(question)

latency_ms = int((time.time() - start_time) * 1000)

# Record audit log
log_agent_decision(
user_id=user_id,
session_id=session_id,
decision_type="route_selection",
input_context={"question": question},
output={"route": route, "reason": "Question involves document retrieval"},
tokens=150,
latency_ms=latency_ms
)

return route

Audit Log Query Example (For compliance audit):

# Query complete decision chain of a session
def get_session_audit_trail(session_id: str):
"""Trace complete conversation via session_id"""
results = db.execute(
"SELECT * FROM agent_audit_log WHERE session_id = ? ORDER BY timestamp",
(session_id,)
)
return [json.loads(row['data']) for row in results]

# Query high latency decisions (For performance optimization)
def get_high_latency_decisions(threshold_ms: int = 2000):
"""Find all decisions with latency exceeding threshold"""
query = {
"query": {
"range": {
"latency_ms": {"gte": threshold_ms}
}
}
}
return es_client.search(index="agent_audit_log", body=query)

Core Capability 5: FastAPI Service Encapsulation​

Goal: Encapsulate LangGraph Agent as RESTful API for Java to call

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class AgentRequest(BaseModel):
question: str
user_id: str
session_id: str

class AgentResponse(BaseModel):
answer: str
sources: list[str]
confidence: float
trace_id: str

@app.post("/agent/query", response_model=AgentResponse)
async def agent_query(request: AgentRequest):
# Run Agent
result = agent_app.invoke({
"question": request.question,
"user_id": request.user_id,
"session_id": request.session_id
})

return AgentResponse(
answer=result["answer"],
sources=result["sources"],
confidence=result["confidence"],
trace_id=result["trace_id"]
)

Phase Output Standards​

Deliverables Must Complete (As prerequisites for entering Level 4):

Agent Application Layer:

  • ⭐ Implement at least 1 Router Agent, able to smartly select from 3 data sources (Milvus/Neo4j/Google), accuracy > 80%
  • Implement at least 1 Autonomous Decision Agent (e.g. Report Generator or Multi-step Reasoning Agent), able to complete full task flow

Service Encapsulation Layer:

  • Establish FastAPI service encapsulating Agent, supporting RESTful API call
  • Implement Java-Python standardized communication protocol (Define interface using OpenAPI, Java side uses Feign Client)

Knowledge Graph Layer:

  • Complete Neo4j Knowledge Graph practice, containing at least 50 nodes and relationships, able to answer multi-hop query questions

Observability Layer:

  • ⭐ Integrate LLM Observability Tool (LangSmith or Arize Phoenix), able to view Trace link and Token consumption
  • ⭐ Implement complete Agent Audit Log System, containing all required fields, able to trace complete conversation via session_id

Capability Verification:

  • ⭐ Able to explain three core concepts of LangGraph: StateGraph, Conditional Edge, Checkpointer
  • Able to design and implement a complete Agent Audit Log System, conforming to requirements of heavily regulated industries like finance, medical

Time Checkpoint: If not completed after 3 weeks, suggest implementing simple Router Agent first, then gradually add complex features

Roadmap Optimization Suggestions​

Practical Project: Implement Audit Log for Agent actions, recording every decision for compliance, ensuring availability in heavily regulated industries like finance, medical


Previous Phase: Level 2 - RAG App Development & Heterogeneous System Architecture

Next Phase: Level 4 - Full Stack Landing & Production Optimization