COMPUTER ACCESS SYSTEM

Agent Error Recovery: 5 Patterns for Production Reliability


Your agent worked perfectly in testing. Then in production it hit a rate limit at step 3 of a multi-step workflow, threw an uncaught exception, and left your system in an undefined state. No checkpoint. No retry. No fallback. Just silence—and a broken pipeline you have to restart by hand.

Agent error recovery is the difference between a demo and a production system. This article covers five patterns used in production agentic workflows: exponential backoff, circuit breakers, checkpoint-and-resume, fallback strategies, and escalation queues. Each pattern is implemented with the Anthropic SDK and works with any orchestration framework.

Prerequisites: You should be comfortable with Python and familiar with how the Claude API works. No prior experience with fault-tolerant systems is required.


Why Agents Fail Differently Than Traditional Software

Traditional software fails at clear boundaries: a database query returns an error, an HTTP call times out, a file is not found. You handle the exception and move on.

Agents fail in more complex ways:

  • Partial progress: An agent completes steps 1–4 of an 8-step workflow, then fails. Without recovery, you either lose all progress or risk repeating work.
  • Ambiguous state: The agent called a tool, but the tool’s response was malformed. Did the action happen? Should you retry?
  • Cascading failure: One slow API call holds up the entire reasoning loop. The agent doesn’t fail outright—it just hangs.
  • Soft failure: The LLM returns a response, but it doesn’t follow the expected format. The downstream parser breaks silently.

These failure modes require patterns beyond simple try/except blocks. Let’s build them.


Pattern 1: Exponential Backoff with Jitter

The most common agent failure is a transient error: rate limits, network blips, temporary service unavailability. The fix is to retry—but naive retries make rate limiting worse by sending bursts of requests.

Exponential backoff doubles the wait time between retries. Jitter adds randomness to prevent multiple agents from retrying simultaneously (the “thundering herd” problem).

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
Call the Claude API with exponential backoff on transient errors.
Retries on rate limits and server errors; raises immediately on client errors.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with full jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Rate limited. Retrying in {jitter:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise # 4xx errors (except 429) are not retriable
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Server error {e.status_code}. Retrying in {jitter:.1f}s")
time.sleep(jitter)
except anthropic.APIConnectionError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Connection error. Retrying in {delay:.1f}s")
time.sleep(delay)
return None
# Usage
response = call_with_backoff([
{"role": "user", "content": "Summarize the state of autonomous agent frameworks."}
])
print(response.content[0].text)

When to use: Any API call inside an agent loop. This should be your default wrapper for all LLM calls.

When NOT to use: Don’t retry on validation errors (400), authentication failures (401), or 404s. These will not succeed on retry—they require code fixes.


Pattern 2: Circuit Breaker

Exponential backoff handles brief transient failures. But if a downstream service is down for 20 minutes, you don’t want your agent retrying every few seconds for the entire duration. A circuit breaker tracks failure rates and temporarily stops calling a failing service entirely.

The circuit breaker has three states:

  • Closed (normal): Requests pass through.
  • Open (failing): Requests fail immediately without calling the service.
  • Half-open (recovering): One test request is allowed; if it succeeds, the circuit closes.
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # failures before opening
recovery_timeout: float = 60.0 # seconds before trying half-open
success_threshold: int = 2 # successes in half-open before closing
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("Circuit breaker is OPEN — service is unavailable")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
print("Circuit breaker CLOSED — service recovered")
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
print(f"Circuit breaker OPEN — {self._failure_count} consecutive failures")
# Integration with the agent loop
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)
def make_agent_call(user_message: str) -> str:
def _call():
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": user_message}],
)
return response.content[0].text
try:
return breaker.call(_call)
except RuntimeError as e:
# Circuit is open — return a degraded response instead of crashing
return f"[Service unavailable. Cached response or fallback logic here.]"

When to use: When your agent calls external tools (web APIs, databases, third-party services) that could go down for extended periods.

For a deeper look at how circuit breakers fit into agent observability, see Debugging and Observability for AI Agents.


Pattern 3: Checkpoint and Resume

Long-running agents that complete work in stages need to be able to resume from the last successful step after a failure. Without checkpointing, a failure at step 7 of 10 means re-running steps 1–6 — wasting time and money.

Checkpoint-and-resume serializes agent state to durable storage at each step. On restart, the agent loads the last checkpoint and continues.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
"""Persist checkpoint to disk."""
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
"""Load checkpoint if it exists; return None if not found."""
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
"""Delete checkpoint after successful completion."""
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)
def run_research_pipeline(task_id: str, topic: str) -> dict:
"""
Multi-step research pipeline with checkpoint-and-resume.
Steps: outline → section_1 → section_2 → section_3 → summary
"""
steps = ["outline", "section_1", "section_2", "section_3", "summary"]
# Load existing checkpoint or start fresh
checkpoint = AgentCheckpoint.load(task_id) or AgentCheckpoint(
task_id=task_id,
current_step=0,
messages=[{"role": "user", "content": f"Research topic: {topic}"}],
)
print(f"Starting from step {checkpoint.current_step} ({steps[checkpoint.current_step]})")
for i in range(checkpoint.current_step, len(steps)):
step = steps[i]
print(f"Running step: {step}")
# Build the prompt for this step
step_prompts = {
"outline": "Create a detailed outline for a report on this topic.",
"section_1": "Write the introduction section based on the outline.",
"section_2": "Write the main findings section.",
"section_3": "Write the analysis and implications section.",
"summary": "Write a concise executive summary of the full report.",
}
checkpoint.messages.append({
"role": "user",
"content": step_prompts[step],
})
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=checkpoint.messages,
)
result_text = response.content[0].text
checkpoint.messages.append({"role": "assistant", "content": result_text})
checkpoint.results[step] = result_text
checkpoint.completed_steps.append(step)
checkpoint.current_step = i + 1
# Save after every successful step
checkpoint.save()
print(f" Step '{step}' complete — checkpoint saved")
except Exception as e:
print(f" Step '{step}' failed: {e}")
checkpoint.save() # Save progress before raising
raise
# All steps complete — clean up
checkpoint.clear()
return checkpoint.results
# Run the pipeline — if it crashes halfway, restart with the same task_id
results = run_research_pipeline("report-001", "production patterns for autonomous agents")
print("Pipeline complete:", list(results.keys()))

When to use: Any multi-step agentic workflow that takes longer than a few seconds, especially those that call expensive APIs or perform irreversible actions.

This pattern pairs naturally with LangGraph’s built-in MemorySaver and PostgresSaver checkpointers. See State Machines and Agents: Building Reliable Workflows with LangGraph for the framework-level approach.


Pattern 4: Fallback Strategies

Some failures are not retriable. The API is down. The tool returned an unusable result. The model refused to answer. In these cases, you need a fallback: an alternative path that keeps the agent moving forward.

Common fallback patterns:

  • Model fallback: Switch to a smaller, faster model if the primary fails
  • Tool fallback: Use a different data source when the primary is unavailable
  • Graceful degradation: Return a partial result instead of failing completely
from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""
Try primary function; fall back to secondary on specified errors.
"""
try:
return primary()
except error_types as e:
print(f"Primary failed ({type(e).__name__}): {e}. Using fallback.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# Primary: use the most capable model
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# Fallback: use a lighter, faster model
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[Fallback response] {response.content[0].text}"
return with_fallback(
primary,
fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)
# Tool fallback example: database → cache → static response
def get_user_data(user_id: str) -> dict:
def from_database():
# Simulate database call that might fail
raise ConnectionError("Database unavailable")
def from_cache():
# Try Redis or in-memory cache
return {"user_id": user_id, "name": "Cached User", "source": "cache"}
def static_response():
return {"user_id": user_id, "name": "Unknown", "source": "default"}
try:
return from_database()
except ConnectionError:
try:
return from_cache()
except Exception:
return static_response()

When to use: When you have a clear primary/secondary hierarchy and partial results are acceptable. Good for latency-sensitive pipelines where waiting for a retry is too costly.


Pattern 5: Escalation Queue

Some failures genuinely cannot be resolved automatically. The model is stuck in a loop. The task requires information only a human has. The action is irreversible and confidence is low. For these cases, you need a structured escalation path.

An escalation queue captures failed tasks with enough context for a human (or a supervisor agent) to resolve them.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self)) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
"""
Add a failed task to the escalation queue for human or supervisor review.
"""
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[ESCALATED] Task {task_id}: {reason.value}")
return record
def run_agent_with_escalation(task: dict, max_retries: int = 3) -> str:
"""
Run an agent task with automatic escalation on persistent failure.
"""
agent_state = {"task": task, "attempts": 0, "messages": []}
for attempt in range(max_retries):
agent_state["attempts"] = attempt + 1
try:
agent_state["messages"].append({
"role": "user",
"content": task["prompt"],
})
response = call_with_backoff(agent_state["messages"])
return response.content[0].text
except anthropic.RateLimitError as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
escalate(
task_id=task["id"],
reason=EscalationReason.MAX_RETRIES_EXCEEDED,
error_message=str(e),
agent_state=agent_state,
context=f"Rate limited after {max_retries} attempts",
)
return "[Task escalated — rate limit exceeded]"
except Exception as e:
escalate(
task_id=task["id"],
reason=EscalationReason.MAX_RETRIES_EXCEEDED,
error_message=str(e),
agent_state=agent_state,
context="Unexpected error during agent execution",
)
return f"[Task escalated — {type(e).__name__}]"
return "[Max retries reached without success]"
# Usage
result = run_agent_with_escalation({
"id": "task-042",
"prompt": "Analyze the quarterly revenue data and generate a forecast.",
})
print(result)

When to use: For tasks involving irreversible actions (sending emails, making payments, deleting records), low-confidence decisions, or any case where getting it wrong has real consequences. See Multi-Agent Patterns for a supervisor agent that reads and resolves escalation queues.


Real-World Use Cases

Document Processing Pipeline

An agent that extracts data from PDFs, validates it against a schema, and writes records to a database. Failure modes: OCR errors, malformed documents, database timeouts. Solution: checkpoint-and-resume for each document; escalation queue for documents that fail validation repeatedly; circuit breaker on the database connection.

Customer Support Agent

An agent that reads support tickets, categorizes them, drafts responses, and routes them. Failure modes: classification ambiguity, edge-case tickets outside training distribution. Solution: low-confidence escalation to human agents; fallback to a simpler classification model when the primary times out.

Research and Synthesis Agent

An agent that queries multiple APIs (news, academic papers, databases), synthesizes findings, and writes a report. Failure modes: some APIs are temporarily unavailable; rate limits. Solution: tool fallback between data sources; exponential backoff on rate-limited APIs; partial results with a clear “data unavailable” note rather than failing the whole report.


Common Pitfalls

Pitfall 1: Retrying Non-Retriable Errors

The problem: Your retry loop catches all exceptions, including validation errors and authentication failures. The agent retries infinitely on a bad API key.

The fix: Classify errors before retrying. Only retry transient errors (5xx, 429, connection timeouts). Raise immediately on 4xx client errors (except 429).

# Wrong: catches everything
except Exception:
retry()
# Right: only retry retriable errors
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise # Don't retry 4xx

Pitfall 2: Losing State on Retry

The problem: You retry the agent loop but reset messages each time. The agent loses context and repeats work already done.

The fix: Preserve message history across retries. Only retry the specific failed step, not the entire workflow.

Pitfall 3: Unbounded Retry Loops

The problem: No max_retries limit. An agent gets stuck retrying indefinitely, consuming tokens and blocking other tasks.

The fix: Always set a maximum retry count. After exhausting retries, escalate or fail fast.

Pitfall 4: Silent Fallback

The problem: Your fallback silently returns degraded data. Downstream code assumes full data and breaks in unpredictable ways.

The fix: Mark fallback responses explicitly. Return a result object that includes a source field indicating whether the data came from the primary or fallback path.


Testing Your Error Recovery

import pytest
from unittest.mock import patch, MagicMock
def test_backoff_retries_on_rate_limit():
"""Verify the agent retries up to max_retries on rate limit errors."""
call_count = 0
def mock_create(**kwargs):
nonlocal call_count
call_count += 1
if call_count < 3:
raise anthropic.RateLimitError("Rate limited", response=MagicMock(), body={})
return MagicMock(content=[MagicMock(text="Success")])
with patch.object(client.messages, "create", side_effect=mock_create):
with patch("time.sleep"): # Don't actually sleep in tests
result = call_with_backoff(
[{"role": "user", "content": "test"}],
max_retries=5,
)
assert result.content[0].text == "Success"
assert call_count == 3
def test_circuit_breaker_opens_after_threshold():
"""Verify the circuit opens after consecutive failures."""
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0)
for _ in range(3):
try:
breaker.call(lambda: (_ for _ in ()).throw(ConnectionError("down")))
except ConnectionError:
pass
assert breaker.state == CircuitState.OPEN
with pytest.raises(RuntimeError, match="OPEN"):
breaker.call(lambda: "should not reach here")
def test_checkpoint_resume_skips_completed_steps(tmp_path):
"""Verify a restarted pipeline skips already-completed steps."""
checkpoint = AgentCheckpoint(
task_id="test-001",
current_step=2, # Simulate crash after step 2
completed_steps=["outline", "section_1"],
results={"outline": "...", "section_1": "..."},
messages=[],
)
checkpoint.save(directory=str(tmp_path))
loaded = AgentCheckpoint.load("test-001", directory=str(tmp_path))
assert loaded.current_step == 2
assert "outline" in loaded.completed_steps
assert "section_1" in loaded.completed_steps

For observability in production — how to track retry rates, circuit breaker state, and escalation queue depth — see Debugging and Observability for AI Agents.


Production Deployment Checklist

Before shipping an agent to production, verify:

  • All API calls wrapped with exponential backoff (base 1s, max 60s, jitter enabled)
  • Circuit breakers on every external dependency (database, third-party APIs)
  • Checkpoints saved to durable storage (not in-memory) after each significant step
  • Fallback paths tested and marked in response metadata
  • Escalation queue monitored and reviewed at least daily
  • Retry counts bounded (max_retries ≤ 5 for most cases)
  • Error classification logic reviewed: 4xx5xx
  • Tests for each recovery path with injected failures
  • Alerts on escalation queue depth and circuit breaker state

Next Steps

You now have five concrete patterns to make your agents production-ready. Here’s what to tackle next:

  1. Start with backoff — Wrap every API call in call_with_backoff as a baseline.
  2. Add checkpointing to any workflow longer than 2–3 steps.
  3. Build an escalation queue for tasks involving irreversible actions.
  4. Write failure injection tests — Test your recovery paths by deliberately triggering each error type.

Ready to go deeper? These guides cover related ground: