COMPUTER ACCESS SYSTEM

State Machines and Agents: Building Reliable Workflows with LangGraph


Most agent tutorials show a simple loop: ask Claude, parse the response, call a tool, repeat. This works for demos. In production, however, you need determinism, error recovery, human approval gates, and auditability. When an agent silently loops on a broken tool call at 2 AM, you need to know exactly what state it was in and why — not dig through unstructured logs hoping to reconstruct the sequence.

LangGraph brings state machines to agent workflows. Instead of an ad-hoc loop held together by if statements and prayer, you get an explicit graph: named nodes (logic units), typed edges (transitions), and a shared state schema that flows through the entire execution. Every run is reproducible. Every decision point is visible. Human checkpoints are first-class citizens.

By the end of this article, you’ll have a multi-step document review agent that routes conditionally based on compliance findings, pauses for human approval, recovers from node failures, and logs each step for auditing.

Why State Machines for Agents?

The Ad-Hoc Loop Problem

A typical agent loop looks like this:

messages = []
while True:
response = claude.messages.create(model=..., messages=messages)
if response.stop_reason == "end_turn":
break
for tool_call in get_tool_calls(response):
result = execute_tool(tool_call)
messages.append(tool_result(tool_call.id, result))

This is readable for two or three tools. Add five tools, conditional paths, a human approval step, retry logic for flaky external APIs, and audit logging — and you have hundreds of lines of tangled control flow. Testing it means running the full loop. Debugging means adding print statements and hoping you can reproduce the failure.

The deeper problem is implicit state. What stage is the agent in? What data has it gathered? What decisions has it made? All of this lives in messages — an untyped blob that every node reads and appends to, with no enforced schema.

State Machines as a Solution

A state machine makes the implicit explicit. You define:

  • Nodes — discrete logic units. Each node receives the current state, does one thing (call Claude, execute a tool, validate data), and returns state updates.
  • Edges — transitions between nodes, either unconditional (A → B always) or conditional (if issues_found: go to review, else: go to summarize).
  • State — a typed dictionary that flows through the entire graph. Every node reads from it and writes to it. The schema is validated at each step.

With this structure, you can test each node in isolation, inspect the state at any point, add or remove nodes without touching unrelated code, and replay any execution from a saved state snapshot.

When Not to Use LangGraph

LangGraph adds overhead — graph compilation, state serialization, node registration. For simple, one-shot tasks (classify this text, extract fields from this JSON), a direct API call is faster and clearer. Use LangGraph when your workflow has:

  • Multiple distinct stages that need to run in sequence
  • Conditional branching based on intermediate results
  • Human-in-the-loop steps
  • Error recovery or retry logic
  • Auditability requirements

Real-World Use Cases

  • Document review pipelines — extract, check compliance, get human approval, publish
  • Multi-department approval workflows — route based on spend amount or risk level
  • Research-then-synthesize agents — gather sources, evaluate quality, synthesize, then review
  • Customer support escalation — classify intent, try automated resolution, escalate if confidence is low

LangGraph Fundamentals

Defining State

State is the central data structure in LangGraph — a typed dictionary that all nodes read from and write to. Define it with TypedDict:

from typing import TypedDict
class ResearchState(TypedDict):
query: str
research_notes: str
draft: str
review_feedback: str
is_approved: bool

Nodes return partial dictionaries. LangGraph merges these updates into the running state after each node executes. If a node returns {"draft": "..."}, only draft is updated; the rest of the state is unchanged.

Nodes

A node is a plain Python function. It receives the current state and returns a dictionary of updates:

import anthropic
client = anthropic.Anthropic()
def research_node(state: ResearchState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Research this topic thoroughly: {state['query']}"
}]
)
return {"research_notes": response.content[0].text}
def draft_node(state: ResearchState) -> dict:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Write a draft based on these notes:\n{state['research_notes']}"
}]
)
return {"draft": response.content[0].text}

Nodes don’t know about each other. They read from state, do work, return updates. This isolation makes them independently testable.

Edges and Conditional Routing

Unconditional edges are fixed transitions: after research, always go to draft.

workflow.add_edge("research", "draft")

Conditional edges use a routing function — a plain Python function that reads state and returns the name of the next node:

from typing import Literal
def route_after_check(state: ResearchState) -> Literal["human_review", "draft"]:
if state.get("review_feedback"):
return "human_review"
return "draft"
workflow.add_conditional_edges(
"research",
route_after_check,
{"human_review": "review_node", "draft": "draft_node"}
)

The routing function contains your business logic. Keeping it separate from node logic keeps both readable.

Building and Running a Graph

from langgraph.graph import StateGraph, END
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_node)
workflow.add_node("draft", draft_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "draft")
workflow.add_edge("draft", END)
graph = workflow.compile()
result = graph.invoke({"query": "What is the MCP protocol?"})
print(result["draft"])

compile() validates the graph structure (catches missing nodes, unreachable states) and returns an executable object. invoke() runs it synchronously and returns the final state.

Building a Document Review Workflow

Now let’s implement a full, production-grade example: a document review agent that extracts key terms, checks for compliance issues, routes to human review if problems are found, and generates a final summary.

Designing the State

from typing import TypedDict
class DocumentState(TypedDict):
document: str # Input document text
key_terms: list[str] # Extracted terms
compliance_issues: list[str] # Found issues (empty = clean)
summary: str # Final output
human_feedback: str # Reviewer notes (if routed to review)
is_approved: bool # Final approval flag

Every field has a clear owner — which node writes it and which nodes read it. This makes the data flow visible at a glance.

Implementing the Nodes

import anthropic
import json
client = anthropic.Anthropic()
def extract_terms_node(state: DocumentState) -> dict:
"""Extract key terms from the document."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": (
"Extract exactly 5 key terms from this document. "
"Return them as a JSON array of strings.\n\n"
f"Document: {state['document']}"
)
}]
)
try:
raw = response.content[0].text.strip()
# Handle markdown code blocks if present
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
terms = json.loads(raw.strip())
except (json.JSONDecodeError, IndexError):
terms = [t.strip() for t in response.content[0].text.split("\n") if t.strip()][:5]
return {"key_terms": terms}
def check_compliance_node(state: DocumentState) -> dict:
"""Check for PII, confidential data, and unverified claims."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": (
"Check this document for compliance issues. "
"Look for: PII (names, SSNs, emails, phone numbers), "
"confidential markings, unverified factual claims.\n\n"
"Return a JSON array of issue descriptions. "
"Return an empty array [] if no issues found.\n\n"
f"Document: {state['document']}"
)
}]
)
try:
raw = response.content[0].text.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
issues = json.loads(raw.strip())
except (json.JSONDecodeError, IndexError):
issues = []
return {"compliance_issues": issues}
def human_review_node(state: DocumentState) -> dict:
"""
In production: pause execution and wait for operator input.
Here we simulate approval after logging the issues.
"""
print("\n=== HUMAN REVIEW REQUIRED ===")
print(f"Compliance issues found:")
for issue in state["compliance_issues"]:
print(f" - {issue}")
print("=============================\n")
# In a real deployment, this would be replaced with:
# - Sending a Slack/email notification
# - Writing the pending state to a database
# - Returning control to the caller with status="waiting"
# - Resuming when the reviewer submits their decision
# Simulated decision:
return {
"human_feedback": "Reviewed and approved with redaction of PII",
"is_approved": True
}
def summarize_node(state: DocumentState) -> dict:
"""Generate a two-sentence summary of the document."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=256,
messages=[{
"role": "user",
"content": f"Summarize this document in exactly two sentences:\n\n{state['document']}"
}]
)
return {
"summary": response.content[0].text,
"is_approved": True
}

Adding Conditional Routing

The routing function reads compliance_issues and returns which node to go to next:

from typing import Literal
def route_after_compliance(state: DocumentState) -> Literal["human_review", "summarize"]:
"""Route to human review if issues exist, otherwise proceed to summarize."""
if state.get("compliance_issues"):
return "human_review"
return "summarize"

Assembling the Graph

from langgraph.graph import StateGraph, END
workflow = StateGraph(DocumentState)
# Register nodes
workflow.add_node("extract", extract_terms_node)
workflow.add_node("check", check_compliance_node)
workflow.add_node("review", human_review_node)
workflow.add_node("summarize", summarize_node)
# Set entry point
workflow.set_entry_point("extract")
# Fixed transitions
workflow.add_edge("extract", "check")
# Conditional transition after compliance check
workflow.add_conditional_edges(
"check",
route_after_compliance,
{
"human_review": "review",
"summarize": "summarize"
}
)
# After human review, always proceed to summarize
workflow.add_edge("review", "summarize")
workflow.add_edge("summarize", END)
# Compile
graph = workflow.compile()

Running It

# Document with PII — will route to human review
result = graph.invoke({
"document": (
"Patient Jane Smith (SSN: 987-65-4321) presented with hypertension. "
"Dr. Adams prescribed lisinopril 10mg daily. "
"Contact: jane.smith@email.com, (555) 867-5309."
),
"key_terms": [],
"compliance_issues": [],
"summary": "",
"human_feedback": "",
"is_approved": False,
})
print(f"Key terms: {result['key_terms']}")
print(f"Issues found: {result['compliance_issues']}")
print(f"Human feedback: {result['human_feedback']}")
print(f"Summary: {result['summary']}")
print(f"Approved: {result['is_approved']}")

The graph handles the routing automatically. If the document is clean, it skips the review node entirely. If it contains PII, it routes through review before reaching summarize.

Human Checkpoints and Interrupts

The human_review_node above simulates approval. In real deployments, you want to actually pause the graph, hand off to a human, and resume when they respond.

LangGraph supports this through interrupt points — nodes that pause execution and return control to the caller with the current state.

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
# Use a checkpointer that persists state between interrupts
checkpointer = MemorySaver()
graph = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["review"] # Pause before the review node
)
# First invocation — runs until the interrupt
thread_config = {"configurable": {"thread_id": "doc-review-001"}}
result = graph.invoke(initial_state, config=thread_config)
# Execution pauses before "review". result contains state up to this point.
# Inspect state before resuming
current_state = graph.get_state(thread_config)
print("Issues:", current_state.values["compliance_issues"])
# Human makes a decision and updates state
graph.update_state(
thread_config,
{"human_feedback": "Approved after manual PII redaction", "is_approved": True}
)
# Resume — continues from the review node forward
final_result = graph.invoke(None, config=thread_config)

The key properties of this pattern:

  1. State is fully preserved across the interrupt. The graph knows exactly where it paused and what data it had.
  2. The reviewer sees only what they need — you extract and display the relevant state fields.
  3. Resumption is explicit — nothing continues until your code calls invoke(None, ...) again.
  4. The thread ID ties the interrupt to a specific execution, so multiple documents can be in-flight simultaneously.

For production human-in-the-loop, the typical pattern is: interrupt → serialize state to database → notify reviewer via Slack or email → reviewer submits decision via your UI → resume from saved state.

Persistence and Production Deployment

State Checkpointing

MemorySaver stores state in memory — good for development, gone when the process restarts. For production, use a persistent backend:

from langgraph.checkpoint.postgres import PostgresSaver
# PostgreSQL checkpointer
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
graph = workflow.compile(checkpointer=checkpointer)
result = graph.invoke(initial_state, config={"configurable": {"thread_id": "doc-001"}})

With a persistent checkpointer, any execution can be resumed after a crash, server restart, or deployment. The thread ID is your handle to a specific execution.

Error Recovery in Nodes

Nodes should catch errors and update state with failure information rather than crashing the entire graph:

def check_compliance_node(state: DocumentState) -> dict:
try:
response = client.messages.create(...)
issues = parse_issues(response)
return {"compliance_issues": issues}
except anthropic.APIError as e:
# Log the error, return an empty issues list so the graph continues
print(f"Compliance check failed: {e}. Proceeding without check.")
return {"compliance_issues": [], "human_feedback": f"Auto-check failed: {e}"}

For nodes that call flaky external APIs, add retry logic at the node level:

import time
def fetch_with_retry(fn, retries=3, delay=2):
for attempt in range(retries):
try:
return fn()
except Exception as e:
if attempt == retries - 1:
raise
time.sleep(delay * (attempt + 1))

Async Execution

For high-throughput workloads, run graphs asynchronously:

import asyncio
async def process_document(doc: str) -> dict:
result = await graph.ainvoke({
"document": doc,
"key_terms": [],
"compliance_issues": [],
"summary": "",
"human_feedback": "",
"is_approved": False,
})
return result
# Process multiple documents concurrently
async def batch_process(documents: list[str]) -> list[dict]:
tasks = [process_document(doc) for doc in documents]
return await asyncio.gather(*tasks)
results = asyncio.run(batch_process(["doc1...", "doc2...", "doc3..."]))

Each document runs as an independent graph invocation. LangGraph handles the async execution model internally.

Common Patterns and Pitfalls

Fan-Out / Fan-In

Run multiple nodes in parallel and merge their results. LangGraph supports this with Send:

from langgraph.types import Send
def dispatch_parallel(state: DocumentState):
# Return Send objects to run nodes in parallel
return [
Send("check_pii", state),
Send("check_claims", state),
Send("check_confidential", state),
]

Results from parallel nodes are merged back into state when all complete.

Conditional Branching Gone Wrong

The most common mistake is putting too much logic in routing functions. When a routing function has more than a few branches, it becomes hard to test and reason about. Instead, split complex workflows into subgraphs — separate StateGraph instances compiled into nodes of the parent graph.

State Bloat

Resist the temptation to store large artifacts in state (full document text, large API responses). State is serialized at every checkpoint. Instead, store references (database IDs, S3 keys) in state and load the actual data inside each node.

Testing Nodes in Isolation

Because nodes are plain functions, you can unit-test them directly without compiling a graph:

def test_extract_terms_node():
state = {
"document": "The quick brown fox jumps over the lazy dog.",
"key_terms": [],
"compliance_issues": [],
"summary": "",
"human_feedback": "",
"is_approved": False,
}
result = extract_terms_node(state)
assert "key_terms" in result
assert isinstance(result["key_terms"], list)
assert len(result["key_terms"]) > 0

For integration tests, compile the graph with MemorySaver and run it against a test document.


LangGraph replaces ad-hoc loops with explicit, debuggable, resumable workflows. The mental shift is small — nodes are just functions, edges are just routing logic — but the operational benefits are large: you can test every stage independently, inspect state at any point, add human checkpoints without rewriting the loop, and resume any execution from a saved snapshot.

Start with a linear graph (A → B → C → END). Add one conditional edge when you need to branch. Add a human checkpoint when you need approval. Most production agent workflows need exactly these three patterns.

The LangGraph documentation covers advanced features — streaming, subgraphs, multi-agent coordination — once your basic graph is working. Build the first workflow; the second is much faster.