エージェントのエラー回復:本番環境の信頼性のための5つのパターン
エージェントはテスト環境で完璧に動いていた。しかし本番環境で、マルチステップワークフローのステップ3でレート制限に達し、未捕捉の例外を投げ、システムを未定義の状態に置き去りにした。チェックポイントなし。リトライなし。フォールバックなし。ただ沈黙——そして手動で再起動しなければならない壊れたパイプライン。
エージェントのエラー回復は、デモと本番システムの差だ。この記事では本番エージェントワークフローで使われる5つのパターンを取り上げる:指数バックオフ、サーキットブレーカー、チェックポイントと再開、フォールバック戦略、エスカレーションキュー。各パターンはAnthropic SDKで実装されており、どのオーケストレーションフレームワークとも動作する。
前提条件: PythonとClaude APIの動作に慣れていること。
エージェントが従来のソフトウェアと異なる形で失敗する理由
エージェントはより複雑な形で失敗する:
- 部分的進捗: エージェントが8ステップワークフローのステップ1–4を完了して失敗する。回復なしでは全進捗が失われる。
- 曖昧な状態: エージェントがツールを呼び出したが、レスポンスが不正形式だった。アクションは実行されたのか?リトライすべきか?
- カスケード障害: 一つの遅いAPI呼び出しが推論ループ全体をブロックする。
- サイレント障害: LLMがレスポンスを返したが、期待されるフォーマットに従っていない。下流のパーサーが静かに壊れる。
パターン1:ジッター付き指数バックオフ
指数バックオフはリトライ間の待機時間を毎回2倍にする。ジッターは複数のエージェントが同時にリトライするのを防ぐためのランダム性を加える。
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ 一時的なエラーに対して指数バックオフでClaude APIを呼び出す。 レート制限とサーバーエラーでリトライ;クライアントエラーは即座に raise。 """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"レート制限。{jitter:.1f}秒後にリトライ(試行 {attempt + 1}/{max_retries})") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"サーバーエラー {e.status_code}。{jitter:.1f}秒後にリトライ") time.sleep(jitter) except anthropic.APIConnectionError: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) return None使用場面: エージェントループ内のあらゆるAPI呼び出し。全LLM呼び出しのデフォルトラッパーにすべきだ。
パターン2:サーキットブレーカー
サーキットブレーカーは障害率を追跡し、失敗しているサービスへの呼び出しを一時的に止める。3つの状態:クローズド(正常)、オープン(障害中)、ハーフオープン(回復中)。
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 # オープンになる前の連続失敗回数 recovery_timeout: float = 60.0 # ハーフオープンを試みるまでの秒数 success_threshold: int = 2 # ハーフオープンからクローズドに戻る成功回数
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("サーキットブレーカーがオープン — サービス利用不可") try: result = func(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPENパターン3:チェックポイントと再開
長時間実行されるエージェントは、障害後に最後に成功したステップから再開できる必要がある。
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)このパターンはLangGraphの組み込みMemorySaverとPostgresSaverと自然に統合する。LangGraphによるステートマシンとエージェントを参照。
パターン4:フォールバック戦略
リトライできない障害もある。エージェントを前進させる代替パスが必要だ。
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """プライマリ関数を試み、指定エラーでフォールバックに切り替える。""" try: return primary() except error_types as e: print(f"プライマリ失敗({type(e).__name__}):{e}。フォールバックを使用。") return fallback()
def answer_question(question: str) -> str: def primary(): # プライマリモデル:最も有能 response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # フォールバック:より軽量で高速なモデル response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[フォールバック応答] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )パターン5:エスカレーションキュー
一部の障害は自動では解決できない。エスカレーションキューは、人間(またはスーパーバイザーエージェント)が解決できるよう十分なコンテキストとともに失敗したタスクを記録する。
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[エスカレート済み] タスク {task_id}:{reason.value}") return record使用場面: 不可逆なアクションや低信頼度の意思決定を含むタスク。エスカレーションキューを処理するスーパーバイザーエージェントについてはマルチエージェントパターンを参照。
よくある落とし穴
落とし穴1:リトライ不可能なエラーをリトライする
解決策: リトライ前にエラーを分類する。一時的なエラーのみリトライする。
# 誤り:全てをキャッチexcept Exception: retry()
# 正しい:リトライ可能なエラーのみexcept (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raise落とし穴2:リトライ時に状態を失う
解決策: リトライ間でメッセージ履歴を保持する。失敗した特定のステップのみリトライする。
落とし穴3:無制限のリトライループ
解決策: 常にmax_retriesを設定する。使い果たした後はエスカレートするか素早く失敗する。
本番デプロイチェックリスト
- 全API呼び出しを指数バックオフでラップ(ベース1秒、最大60秒、ジッター有効)
- 全外部依存関係にサーキットブレーカー
- 各重要ステップ後に永続ストレージへチェックポイントを保存
- フォールバックパスをテストし、レスポンスメタデータでマーク
- エスカレーションキューを少なくとも毎日監視・レビュー
- リトライ回数を制限(
max_retries≤ 5)
次のステップ
- バックオフから始めよう — 全API呼び出しを
call_with_backoffでラップ。 - チェックポイントを追加しよう — 2–3ステップを超えるワークフローに。
- エスカレーションキューを構築しよう — 不可逆なアクションを含むタスク用に。
- 障害注入テストを書こう — 各エラータイプを意図的に発生させて回復パスをテスト。
関連ガイド: