计算机访问系统

状态机与智能体:使用LangGraph构建可靠工作流


大多数智能体教程展示的是一个简单循环:向Claude提问,解析响应,调用工具,重复。这对于演示可以,但在生产环境中,你需要确定性、错误恢复、人工审批关卡和可审计性。

LangGraph将状态机引入智能体工作流。你得到的不再是由if语句拼凑的临时循环,而是一个显式的图:命名节点(逻辑单元)、类型化的边(转换)和贯穿整个执行过程的共享状态模式。

为什么要为智能体使用状态机?

临时循环的问题

典型的智能体循环如下所示:

messages = []
while True:
response = claude.messages.create(model=..., messages=messages)
if response.stop_reason == "end_turn":
break
for tool_call in get_tool_calls(response):
result = execute_tool(tool_call)
messages.append(tool_result(tool_call.id, result))

对于两三个工具这是可读的。加上五个工具、条件路径、人工审批步骤和重试逻辑——你就有了数百行纠缠的控制流。

更深层的问题是隐式状态。智能体处于哪个阶段?它收集了什么数据?这一切都存在于messages中——每个节点读取并追加的无类型数据块,没有强制模式。

状态机作为解决方案

状态机使隐式变为显式。你定义:

  • 节点 — 离散的逻辑单元。每个节点接收当前状态,做一件事,返回状态更新。
  • — 节点之间的转换,无条件(A → B始终)或有条件(如果发现问题:转到审查,否则:转到摘要)。
  • 状态 — 贯穿整个图的类型化字典。每一步都对模式进行验证。

何时不使用LangGraph

对于简单的一次性任务(文本分类、字段提取),直接的API调用更快更清晰。当工作流具有以下情况时使用LangGraph:

  • 多个必须按顺序执行的不同阶段
  • 基于中间结果的条件分支
  • 人工参与步骤
  • 错误恢复或重试逻辑
  • 可审计性要求

LangGraph基础

定义状态

from typing import TypedDict
class ResearchState(TypedDict):
query: str
research_notes: str
draft: str
review_feedback: str
is_approved: bool

节点返回部分字典。LangGraph在每个节点执行后将这些更新合并到运行状态中。

节点

import anthropic
client = anthropic.Anthropic()
def research_node(state: ResearchState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"深入研究这个主题:{state['query']}"
}]
)
return {"research_notes": response.content[0].text}
def draft_node(state: ResearchState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"根据这些笔记写一份草稿:\n{state['research_notes']}"
}]
)
return {"draft": response.content[0].text}

边和条件路由

from typing import Literal
def route_after_check(state: ResearchState) -> Literal["human_review", "draft"]:
if state.get("review_feedback"):
return "human_review"
return "draft"
workflow.add_conditional_edges(
"research",
route_after_check,
{"human_review": "review_node", "draft": "draft_node"}
)

构建和运行图

from langgraph.graph import StateGraph, END
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_node)
workflow.add_node("draft", draft_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "draft")
workflow.add_edge("draft", END)
graph = workflow.compile()
result = graph.invoke({"query": "什么是MCP协议?"})
print(result["draft"])

构建文档审查工作流

设计状态

class DocumentState(TypedDict):
document: str
key_terms: list[str]
compliance_issues: list[str]
summary: str
human_feedback: str
is_approved: bool

实现节点

import json
def extract_terms_node(state: DocumentState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": (
"从这个文档中提取恰好5个关键术语。"
"以字符串JSON数组形式返回。\n\n"
f"文档:{state['document']}"
)
}]
)
try:
raw = response.content[0].text.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
terms = json.loads(raw.strip())
except (json.JSONDecodeError, IndexError):
terms = [t.strip() for t in response.content[0].text.split("\n") if t.strip()][:5]
return {"key_terms": terms}
def check_compliance_node(state: DocumentState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": (
"检查这个文档的合规问题。"
"查找:PII(姓名、身份证号、邮箱、电话),"
"机密标记,未经核实的说法。\n\n"
"以问题描述的JSON数组形式返回。"
"如果没有问题,返回空数组[]。\n\n"
f"文档:{state['document']}"
)
}]
)
try:
raw = response.content[0].text.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
issues = json.loads(raw.strip())
except (json.JSONDecodeError, IndexError):
issues = []
return {"compliance_issues": issues}
def human_review_node(state: DocumentState) -> dict:
print("\n=== 需要人工审查 ===")
for issue in state["compliance_issues"]:
print(f" - {issue}")
return {
"human_feedback": "审查并批准,PII已脱敏",
"is_approved": True
}
def summarize_node(state: DocumentState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=256,
messages=[{
"role": "user",
"content": f"用恰好两句话总结这个文档:\n\n{state['document']}"
}]
)
return {"summary": response.content[0].text, "is_approved": True}

组装图

def route_after_compliance(state: DocumentState) -> Literal["human_review", "summarize"]:
if state.get("compliance_issues"):
return "human_review"
return "summarize"
workflow = StateGraph(DocumentState)
workflow.add_node("extract", extract_terms_node)
workflow.add_node("check", check_compliance_node)
workflow.add_node("review", human_review_node)
workflow.add_node("summarize", summarize_node)
workflow.set_entry_point("extract")
workflow.add_edge("extract", "check")
workflow.add_conditional_edges(
"check", route_after_compliance,
{"human_review": "review", "summarize": "summarize"}
)
workflow.add_edge("review", "summarize")
workflow.add_edge("summarize", END)
graph = workflow.compile()
result = graph.invoke({
"document": "患者张三(身份证:110101199001011234)患有高血压。",
"key_terms": [], "compliance_issues": [],
"summary": "", "human_feedback": "", "is_approved": False,
})
print(result["summary"])

人工检查点和中断

from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["review"]
)
thread_config = {"configurable": {"thread_id": "doc-review-001"}}
result = graph.invoke(initial_state, config=thread_config)
current_state = graph.get_state(thread_config)
print("发现问题:", current_state.values["compliance_issues"])
graph.update_state(
thread_config,
{"human_feedback": "人工脱敏后批准", "is_approved": True}
)
final_result = graph.invoke(None, config=thread_config)

持久化和生产部署

from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
graph = workflow.compile(checkpointer=checkpointer)
import asyncio
async def batch_process(documents: list[str]) -> list[dict]:
return await asyncio.gather(*[
graph.ainvoke({"document": doc, "key_terms": [], "compliance_issues": [],
"summary": "", "human_feedback": "", "is_approved": False})
for doc in documents
])

常见模式和陷阱

  • 扇出/扇入:使用Send并行运行多个节点并合并结果。
  • 条件路由过于复杂:如果路由函数有太多分支,分解为子图。
  • 状态膨胀:存储引用(数据库ID、S3键)而不是大型数据。
  • 隔离测试节点:由于节点是普通函数,可以直接进行单元测试。
def test_extract_terms_node():
state = {
"document": "人工智能正在快速改变现代社会的各个方面。",
"key_terms": [], "compliance_issues": [],
"summary": "", "human_feedback": "", "is_approved": False,
}
result = extract_terms_node(state)
assert "key_terms" in result
assert isinstance(result["key_terms"], list)

LangGraph用显式、可调试、可恢复的工作流替代了临时循环。从线性图(A → B → C → 结束)开始。需要分支时添加一条条件边。需要审批时添加人工检查点。大多数生产智能体工作流只需要这三种模式。


相关文章