状态机与智能体:使用LangGraph构建可靠工作流
大多数智能体教程展示的是一个简单循环:向Claude提问,解析响应,调用工具,重复。这对于演示可以,但在生产环境中,你需要确定性、错误恢复、人工审批关卡和可审计性。
LangGraph将状态机引入智能体工作流。你得到的不再是由if语句拼凑的临时循环,而是一个显式的图:命名节点(逻辑单元)、类型化的边(转换)和贯穿整个执行过程的共享状态模式。
为什么要为智能体使用状态机?
临时循环的问题
典型的智能体循环如下所示:
messages = []while True: response = claude.messages.create(model=..., messages=messages) if response.stop_reason == "end_turn": break for tool_call in get_tool_calls(response): result = execute_tool(tool_call) messages.append(tool_result(tool_call.id, result))对于两三个工具这是可读的。加上五个工具、条件路径、人工审批步骤和重试逻辑——你就有了数百行纠缠的控制流。
更深层的问题是隐式状态。智能体处于哪个阶段?它收集了什么数据?这一切都存在于messages中——每个节点读取并追加的无类型数据块,没有强制模式。
状态机作为解决方案
状态机使隐式变为显式。你定义:
- 节点 — 离散的逻辑单元。每个节点接收当前状态,做一件事,返回状态更新。
- 边 — 节点之间的转换,无条件(
A → B始终)或有条件(如果发现问题:转到审查,否则:转到摘要)。 - 状态 — 贯穿整个图的类型化字典。每一步都对模式进行验证。
何时不使用LangGraph
对于简单的一次性任务(文本分类、字段提取),直接的API调用更快更清晰。当工作流具有以下情况时使用LangGraph:
- 多个必须按顺序执行的不同阶段
- 基于中间结果的条件分支
- 人工参与步骤
- 错误恢复或重试逻辑
- 可审计性要求
LangGraph基础
定义状态
from typing import TypedDict
class ResearchState(TypedDict): query: str research_notes: str draft: str review_feedback: str is_approved: bool节点返回部分字典。LangGraph在每个节点执行后将这些更新合并到运行状态中。
节点
import anthropic
client = anthropic.Anthropic()
def research_node(state: ResearchState) -> dict: response = client.messages.create( model="claude-opus-4-6", max_tokens=2048, messages=[{ "role": "user", "content": f"深入研究这个主题:{state['query']}" }] ) return {"research_notes": response.content[0].text}
def draft_node(state: ResearchState) -> dict: response = client.messages.create( model="claude-opus-4-6", max_tokens=2048, messages=[{ "role": "user", "content": f"根据这些笔记写一份草稿:\n{state['research_notes']}" }] ) return {"draft": response.content[0].text}边和条件路由
from typing import Literal
def route_after_check(state: ResearchState) -> Literal["human_review", "draft"]: if state.get("review_feedback"): return "human_review" return "draft"
workflow.add_conditional_edges( "research", route_after_check, {"human_review": "review_node", "draft": "draft_node"})构建和运行图
from langgraph.graph import StateGraph, END
workflow = StateGraph(ResearchState)workflow.add_node("research", research_node)workflow.add_node("draft", draft_node)workflow.set_entry_point("research")workflow.add_edge("research", "draft")workflow.add_edge("draft", END)
graph = workflow.compile()result = graph.invoke({"query": "什么是MCP协议?"})print(result["draft"])构建文档审查工作流
设计状态
class DocumentState(TypedDict): document: str key_terms: list[str] compliance_issues: list[str] summary: str human_feedback: str is_approved: bool实现节点
import json
def extract_terms_node(state: DocumentState) -> dict: response = client.messages.create( model="claude-opus-4-6", max_tokens=512, messages=[{ "role": "user", "content": ( "从这个文档中提取恰好5个关键术语。" "以字符串JSON数组形式返回。\n\n" f"文档:{state['document']}" ) }] ) try: raw = response.content[0].text.strip() if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] terms = json.loads(raw.strip()) except (json.JSONDecodeError, IndexError): terms = [t.strip() for t in response.content[0].text.split("\n") if t.strip()][:5] return {"key_terms": terms}
def check_compliance_node(state: DocumentState) -> dict: response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{ "role": "user", "content": ( "检查这个文档的合规问题。" "查找:PII(姓名、身份证号、邮箱、电话)," "机密标记,未经核实的说法。\n\n" "以问题描述的JSON数组形式返回。" "如果没有问题,返回空数组[]。\n\n" f"文档:{state['document']}" ) }] ) try: raw = response.content[0].text.strip() if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] issues = json.loads(raw.strip()) except (json.JSONDecodeError, IndexError): issues = [] return {"compliance_issues": issues}
def human_review_node(state: DocumentState) -> dict: print("\n=== 需要人工审查 ===") for issue in state["compliance_issues"]: print(f" - {issue}") return { "human_feedback": "审查并批准,PII已脱敏", "is_approved": True }
def summarize_node(state: DocumentState) -> dict: response = client.messages.create( model="claude-opus-4-6", max_tokens=256, messages=[{ "role": "user", "content": f"用恰好两句话总结这个文档:\n\n{state['document']}" }] ) return {"summary": response.content[0].text, "is_approved": True}组装图
def route_after_compliance(state: DocumentState) -> Literal["human_review", "summarize"]: if state.get("compliance_issues"): return "human_review" return "summarize"
workflow = StateGraph(DocumentState)workflow.add_node("extract", extract_terms_node)workflow.add_node("check", check_compliance_node)workflow.add_node("review", human_review_node)workflow.add_node("summarize", summarize_node)
workflow.set_entry_point("extract")workflow.add_edge("extract", "check")workflow.add_conditional_edges( "check", route_after_compliance, {"human_review": "review", "summarize": "summarize"})workflow.add_edge("review", "summarize")workflow.add_edge("summarize", END)
graph = workflow.compile()
result = graph.invoke({ "document": "患者张三(身份证:110101199001011234)患有高血压。", "key_terms": [], "compliance_issues": [], "summary": "", "human_feedback": "", "is_approved": False,})print(result["summary"])人工检查点和中断
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()graph = workflow.compile( checkpointer=checkpointer, interrupt_before=["review"])
thread_config = {"configurable": {"thread_id": "doc-review-001"}}result = graph.invoke(initial_state, config=thread_config)
current_state = graph.get_state(thread_config)print("发现问题:", current_state.values["compliance_issues"])
graph.update_state( thread_config, {"human_feedback": "人工脱敏后批准", "is_approved": True})
final_result = graph.invoke(None, config=thread_config)持久化和生产部署
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer: graph = workflow.compile(checkpointer=checkpointer)
import asyncio
async def batch_process(documents: list[str]) -> list[dict]: return await asyncio.gather(*[ graph.ainvoke({"document": doc, "key_terms": [], "compliance_issues": [], "summary": "", "human_feedback": "", "is_approved": False}) for doc in documents ])常见模式和陷阱
- 扇出/扇入:使用
Send并行运行多个节点并合并结果。 - 条件路由过于复杂:如果路由函数有太多分支,分解为子图。
- 状态膨胀:存储引用(数据库ID、S3键)而不是大型数据。
- 隔离测试节点:由于节点是普通函数,可以直接进行单元测试。
def test_extract_terms_node(): state = { "document": "人工智能正在快速改变现代社会的各个方面。", "key_terms": [], "compliance_issues": [], "summary": "", "human_feedback": "", "is_approved": False, } result = extract_terms_node(state) assert "key_terms" in result assert isinstance(result["key_terms"], list)LangGraph用显式、可调试、可恢复的工作流替代了临时循环。从线性图(A → B → C → 结束)开始。需要分支时添加一条条件边。需要审批时添加人工检查点。大多数生产智能体工作流只需要这三种模式。