Agent Error Recovery: 5 Patterns for Production Reliability
Your agent worked perfectly in testing. Then in production it hit a rate limit at step 3 of a multi-step workflow, threw an uncaught exception, and left your system in an undefined state. No checkpoint. No retry. No fallback. Just silence—and a broken pipeline you have to restart by hand.
Agent error recovery is the difference between a demo and a production system. This article covers five patterns used in production agentic workflows: exponential backoff, circuit breakers, checkpoint-and-resume, fallback strategies, and escalation queues. Each pattern is implemented with the Anthropic SDK and works with any orchestration framework.
Prerequisites: You should be comfortable with Python and familiar with how the Claude API works. No prior experience with fault-tolerant systems is required.
Why Agents Fail Differently Than Traditional Software
Traditional software fails at clear boundaries: a database query returns an error, an HTTP call times out, a file is not found. You handle the exception and move on.
Agents fail in more complex ways:
- Partial progress: An agent completes steps 1–4 of an 8-step workflow, then fails. Without recovery, you either lose all progress or risk repeating work.
- Ambiguous state: The agent called a tool, but the tool’s response was malformed. Did the action happen? Should you retry?
- Cascading failure: One slow API call holds up the entire reasoning loop. The agent doesn’t fail outright—it just hangs.
- Soft failure: The LLM returns a response, but it doesn’t follow the expected format. The downstream parser breaks silently.
These failure modes require patterns beyond simple try/except blocks. Let’s build them.
Pattern 1: Exponential Backoff with Jitter
The most common agent failure is a transient error: rate limits, network blips, temporary service unavailability. The fix is to retry—but naive retries make rate limiting worse by sending bursts of requests.
Exponential backoff doubles the wait time between retries. Jitter adds randomness to prevent multiple agents from retrying simultaneously (the “thundering herd” problem).
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ Call the Claude API with exponential backoff on transient errors. Retries on rate limits and server errors; raises immediately on client errors. """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise # Exponential backoff with full jitter delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Rate limited. Retrying in {jitter:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise # 4xx errors (except 429) are not retriable if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Server error {e.status_code}. Retrying in {jitter:.1f}s") time.sleep(jitter) except anthropic.APIConnectionError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) print(f"Connection error. Retrying in {delay:.1f}s") time.sleep(delay) return None
# Usageresponse = call_with_backoff([ {"role": "user", "content": "Summarize the state of autonomous agent frameworks."}])print(response.content[0].text)When to use: Any API call inside an agent loop. This should be your default wrapper for all LLM calls.
When NOT to use: Don’t retry on validation errors (400), authentication failures (401), or 404s. These will not succeed on retry—they require code fixes.
Pattern 2: Circuit Breaker
Exponential backoff handles brief transient failures. But if a downstream service is down for 20 minutes, you don’t want your agent retrying every few seconds for the entire duration. A circuit breaker tracks failure rates and temporarily stops calling a failing service entirely.
The circuit breaker has three states:
- Closed (normal): Requests pass through.
- Open (failing): Requests fail immediately without calling the service.
- Half-open (recovering): One test request is allowed; if it succeeds, the circuit closes.
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 # failures before opening recovery_timeout: float = 60.0 # seconds before trying half-open success_threshold: int = 2 # successes in half-open before closing
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("Circuit breaker is OPEN — service is unavailable")
try: result = func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED print("Circuit breaker CLOSED — service recovered")
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPEN print(f"Circuit breaker OPEN — {self._failure_count} consecutive failures")
# Integration with the agent loopbreaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)
def make_agent_call(user_message: str) -> str: def _call(): response = client.messages.create( model="claude-sonnet-4-6", max_tokens=512, messages=[{"role": "user", "content": user_message}], ) return response.content[0].text
try: return breaker.call(_call) except RuntimeError as e: # Circuit is open — return a degraded response instead of crashing return f"[Service unavailable. Cached response or fallback logic here.]"When to use: When your agent calls external tools (web APIs, databases, third-party services) that could go down for extended periods.
For a deeper look at how circuit breakers fit into agent observability, see Debugging and Observability for AI Agents.
Pattern 3: Checkpoint and Resume
Long-running agents that complete work in stages need to be able to resume from the last successful step after a failure. Without checkpointing, a failure at step 7 of 10 means re-running steps 1–6 — wasting time and money.
Checkpoint-and-resume serializes agent state to durable storage at each step. On restart, the agent loads the last checkpoint and continues.
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: """Persist checkpoint to disk.""" os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: """Load checkpoint if it exists; return None if not found.""" path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: """Delete checkpoint after successful completion.""" path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)
def run_research_pipeline(task_id: str, topic: str) -> dict: """ Multi-step research pipeline with checkpoint-and-resume. Steps: outline → section_1 → section_2 → section_3 → summary """ steps = ["outline", "section_1", "section_2", "section_3", "summary"]
# Load existing checkpoint or start fresh checkpoint = AgentCheckpoint.load(task_id) or AgentCheckpoint( task_id=task_id, current_step=0, messages=[{"role": "user", "content": f"Research topic: {topic}"}], )
print(f"Starting from step {checkpoint.current_step} ({steps[checkpoint.current_step]})")
for i in range(checkpoint.current_step, len(steps)): step = steps[i] print(f"Running step: {step}")
# Build the prompt for this step step_prompts = { "outline": "Create a detailed outline for a report on this topic.", "section_1": "Write the introduction section based on the outline.", "section_2": "Write the main findings section.", "section_3": "Write the analysis and implications section.", "summary": "Write a concise executive summary of the full report.", }
checkpoint.messages.append({ "role": "user", "content": step_prompts[step], })
try: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, messages=checkpoint.messages, ) result_text = response.content[0].text checkpoint.messages.append({"role": "assistant", "content": result_text}) checkpoint.results[step] = result_text checkpoint.completed_steps.append(step) checkpoint.current_step = i + 1
# Save after every successful step checkpoint.save() print(f" Step '{step}' complete — checkpoint saved")
except Exception as e: print(f" Step '{step}' failed: {e}") checkpoint.save() # Save progress before raising raise
# All steps complete — clean up checkpoint.clear() return checkpoint.results
# Run the pipeline — if it crashes halfway, restart with the same task_idresults = run_research_pipeline("report-001", "production patterns for autonomous agents")print("Pipeline complete:", list(results.keys()))When to use: Any multi-step agentic workflow that takes longer than a few seconds, especially those that call expensive APIs or perform irreversible actions.
This pattern pairs naturally with LangGraph’s built-in MemorySaver and PostgresSaver checkpointers. See State Machines and Agents: Building Reliable Workflows with LangGraph for the framework-level approach.
Pattern 4: Fallback Strategies
Some failures are not retriable. The API is down. The tool returned an unusable result. The model refused to answer. In these cases, you need a fallback: an alternative path that keeps the agent moving forward.
Common fallback patterns:
- Model fallback: Switch to a smaller, faster model if the primary fails
- Tool fallback: Use a different data source when the primary is unavailable
- Graceful degradation: Return a partial result instead of failing completely
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """ Try primary function; fall back to secondary on specified errors. """ try: return primary() except error_types as e: print(f"Primary failed ({type(e).__name__}): {e}. Using fallback.") return fallback()
def answer_question(question: str) -> str: def primary(): # Primary: use the most capable model response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # Fallback: use a lighter, faster model response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[Fallback response] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )
# Tool fallback example: database → cache → static responsedef get_user_data(user_id: str) -> dict: def from_database(): # Simulate database call that might fail raise ConnectionError("Database unavailable")
def from_cache(): # Try Redis or in-memory cache return {"user_id": user_id, "name": "Cached User", "source": "cache"}
def static_response(): return {"user_id": user_id, "name": "Unknown", "source": "default"}
try: return from_database() except ConnectionError: try: return from_cache() except Exception: return static_response()When to use: When you have a clear primary/secondary hierarchy and partial results are acceptable. Good for latency-sensitive pipelines where waiting for a retry is too costly.
Pattern 5: Escalation Queue
Some failures genuinely cannot be resolved automatically. The model is stuck in a loop. The task requires information only a human has. The action is irreversible and confidence is low. For these cases, you need a structured escalation path.
An escalation queue captures failed tasks with enough context for a human (or a supervisor agent) to resolve them.
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self)) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: """ Add a failed task to the escalation queue for human or supervisor review. """ record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[ESCALATED] Task {task_id}: {reason.value}") return record
def run_agent_with_escalation(task: dict, max_retries: int = 3) -> str: """ Run an agent task with automatic escalation on persistent failure. """ agent_state = {"task": task, "attempts": 0, "messages": []}
for attempt in range(max_retries): agent_state["attempts"] = attempt + 1 try: agent_state["messages"].append({ "role": "user", "content": task["prompt"], }) response = call_with_backoff(agent_state["messages"]) return response.content[0].text
except anthropic.RateLimitError as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue escalate( task_id=task["id"], reason=EscalationReason.MAX_RETRIES_EXCEEDED, error_message=str(e), agent_state=agent_state, context=f"Rate limited after {max_retries} attempts", ) return "[Task escalated — rate limit exceeded]"
except Exception as e: escalate( task_id=task["id"], reason=EscalationReason.MAX_RETRIES_EXCEEDED, error_message=str(e), agent_state=agent_state, context="Unexpected error during agent execution", ) return f"[Task escalated — {type(e).__name__}]"
return "[Max retries reached without success]"
# Usageresult = run_agent_with_escalation({ "id": "task-042", "prompt": "Analyze the quarterly revenue data and generate a forecast.",})print(result)When to use: For tasks involving irreversible actions (sending emails, making payments, deleting records), low-confidence decisions, or any case where getting it wrong has real consequences. See Multi-Agent Patterns for a supervisor agent that reads and resolves escalation queues.
Real-World Use Cases
Document Processing Pipeline
An agent that extracts data from PDFs, validates it against a schema, and writes records to a database. Failure modes: OCR errors, malformed documents, database timeouts. Solution: checkpoint-and-resume for each document; escalation queue for documents that fail validation repeatedly; circuit breaker on the database connection.
Customer Support Agent
An agent that reads support tickets, categorizes them, drafts responses, and routes them. Failure modes: classification ambiguity, edge-case tickets outside training distribution. Solution: low-confidence escalation to human agents; fallback to a simpler classification model when the primary times out.
Research and Synthesis Agent
An agent that queries multiple APIs (news, academic papers, databases), synthesizes findings, and writes a report. Failure modes: some APIs are temporarily unavailable; rate limits. Solution: tool fallback between data sources; exponential backoff on rate-limited APIs; partial results with a clear “data unavailable” note rather than failing the whole report.
Common Pitfalls
Pitfall 1: Retrying Non-Retriable Errors
The problem: Your retry loop catches all exceptions, including validation errors and authentication failures. The agent retries infinitely on a bad API key.
The fix: Classify errors before retrying. Only retry transient errors (5xx, 429, connection timeouts). Raise immediately on 4xx client errors (except 429).
# Wrong: catches everythingexcept Exception: retry()
# Right: only retry retriable errorsexcept (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raise # Don't retry 4xxPitfall 2: Losing State on Retry
The problem: You retry the agent loop but reset messages each time. The agent loses context and repeats work already done.
The fix: Preserve message history across retries. Only retry the specific failed step, not the entire workflow.
Pitfall 3: Unbounded Retry Loops
The problem: No max_retries limit. An agent gets stuck retrying indefinitely, consuming tokens and blocking other tasks.
The fix: Always set a maximum retry count. After exhausting retries, escalate or fail fast.
Pitfall 4: Silent Fallback
The problem: Your fallback silently returns degraded data. Downstream code assumes full data and breaks in unpredictable ways.
The fix: Mark fallback responses explicitly. Return a result object that includes a source field indicating whether the data came from the primary or fallback path.
Testing Your Error Recovery
import pytestfrom unittest.mock import patch, MagicMock
def test_backoff_retries_on_rate_limit(): """Verify the agent retries up to max_retries on rate limit errors.""" call_count = 0
def mock_create(**kwargs): nonlocal call_count call_count += 1 if call_count < 3: raise anthropic.RateLimitError("Rate limited", response=MagicMock(), body={}) return MagicMock(content=[MagicMock(text="Success")])
with patch.object(client.messages, "create", side_effect=mock_create): with patch("time.sleep"): # Don't actually sleep in tests result = call_with_backoff( [{"role": "user", "content": "test"}], max_retries=5, ) assert result.content[0].text == "Success" assert call_count == 3
def test_circuit_breaker_opens_after_threshold(): """Verify the circuit opens after consecutive failures.""" breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0)
for _ in range(3): try: breaker.call(lambda: (_ for _ in ()).throw(ConnectionError("down"))) except ConnectionError: pass
assert breaker.state == CircuitState.OPEN
with pytest.raises(RuntimeError, match="OPEN"): breaker.call(lambda: "should not reach here")
def test_checkpoint_resume_skips_completed_steps(tmp_path): """Verify a restarted pipeline skips already-completed steps.""" checkpoint = AgentCheckpoint( task_id="test-001", current_step=2, # Simulate crash after step 2 completed_steps=["outline", "section_1"], results={"outline": "...", "section_1": "..."}, messages=[], ) checkpoint.save(directory=str(tmp_path))
loaded = AgentCheckpoint.load("test-001", directory=str(tmp_path)) assert loaded.current_step == 2 assert "outline" in loaded.completed_steps assert "section_1" in loaded.completed_stepsFor observability in production — how to track retry rates, circuit breaker state, and escalation queue depth — see Debugging and Observability for AI Agents.
Production Deployment Checklist
Before shipping an agent to production, verify:
- All API calls wrapped with exponential backoff (base 1s, max 60s, jitter enabled)
- Circuit breakers on every external dependency (database, third-party APIs)
- Checkpoints saved to durable storage (not in-memory) after each significant step
- Fallback paths tested and marked in response metadata
- Escalation queue monitored and reviewed at least daily
- Retry counts bounded (
max_retries≤ 5 for most cases) - Error classification logic reviewed:
4xx≠5xx - Tests for each recovery path with injected failures
- Alerts on escalation queue depth and circuit breaker state
Next Steps
You now have five concrete patterns to make your agents production-ready. Here’s what to tackle next:
- Start with backoff — Wrap every API call in
call_with_backoffas a baseline. - Add checkpointing to any workflow longer than 2–3 steps.
- Build an escalation queue for tasks involving irreversible actions.
- Write failure injection tests — Test your recovery paths by deliberately triggering each error type.
Ready to go deeper? These guides cover related ground:
- State Machines and Agents: Building Reliable Workflows with LangGraph — LangGraph’s built-in checkpointing and interrupt-before patterns
- Multi-Agent Patterns — Supervisor agents that handle escalation queues
- Debugging and Observability for AI Agents — Instrumentation and tracing for production agents
- Tool Use Patterns: Building Reliable Agent-Tool Interfaces
- Agent Memory Systems: Giving Your AI Persistent Context