Pemulihan Error Agen: 5 Pola untuk Keandalan Produksi
Agenmu bekerja sempurna di lingkungan pengujian. Lalu di produksi, ia mencapai batas rate di langkah 3 dari alur kerja multi-langkah, melempar exception yang tidak tertangani, dan meninggalkan sistemmu dalam keadaan tidak terdefinisi. Tidak ada checkpoint. Tidak ada percobaan ulang. Tidak ada fallback. Hanya keheningan — dan pipeline yang rusak yang harus di-restart secara manual.
Pemulihan error agen adalah perbedaan antara demo dan sistem produksi. Artikel ini mencakup lima pola yang digunakan dalam alur kerja agentik produksi: exponential backoff, circuit breaker, checkpoint-and-resume, strategi fallback, dan antrian eskalasi. Setiap pola diimplementasikan dengan Anthropic SDK.
Prasyarat: Kamu perlu familiar dengan Python dan cara kerja API Claude.
Mengapa Agen Gagal Berbeda dari Perangkat Lunak Tradisional
Agen gagal dengan cara yang lebih kompleks:
- Kemajuan parsial: Agen menyelesaikan langkah 1–4 dari alur 8 langkah, lalu gagal. Tanpa pemulihan, semua kemajuan hilang.
- Status ambigu: Agen memanggil sebuah alat, tapi responsnya malformed. Apakah tindakan terjadi? Haruskah mencoba ulang?
- Kegagalan berantai: Satu panggilan API yang lambat memblokir seluruh loop penalaran.
- Kegagalan diam: LLM mengembalikan respons, tapi tidak mengikuti format yang diharapkan. Parser downstream gagal secara diam-diam.
Pola 1: Exponential Backoff dengan Jitter
Exponential backoff menggandakan waktu tunggu antar percobaan ulang. Jitter menambahkan keacakan untuk mencegah beberapa agen mencoba ulang secara bersamaan.
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ Memanggil API Claude dengan exponential backoff pada error transien. Mencoba ulang pada rate limit dan error server; langsung raise pada error klien. """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Rate limit. Mencoba ulang dalam {jitter:.1f}s (percobaan {attempt + 1}/{max_retries})") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Error server {e.status_code}. Mencoba ulang dalam {jitter:.1f}s") time.sleep(jitter) except anthropic.APIConnectionError: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) return NoneKapan digunakan: Untuk setiap panggilan API di dalam loop agen. Ini harus menjadi wrapper default untuk semua panggilan LLM.
Pola 2: Circuit Breaker
Circuit breaker melacak tingkat kegagalan dan sementara berhenti memanggil layanan yang gagal. Tiga status: Tertutup (normal), Terbuka (gagal), dan Setengah terbuka (memulihkan).
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 recovery_timeout: float = 60.0 success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("Circuit breaker TERBUKA — layanan tidak tersedia") try: result = func(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPENPola 3: Checkpoint dan Lanjutkan
Agen yang berjalan lama perlu dapat melanjutkan dari langkah terakhir yang berhasil setelah kegagalan.
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)Pola ini terintegrasi secara alami dengan MemorySaver dan PostgresSaver bawaan LangGraph. Lihat Mesin Status dan Agen dengan LangGraph.
Pola 4: Strategi Fallback
Beberapa kegagalan tidak dapat dicoba ulang. Kamu butuh jalur alternatif yang membuat agen tetap bergerak.
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """Mencoba fungsi utama; beralih ke sekunder pada error yang ditentukan.""" try: return primary() except error_types as e: print(f"Utama gagal ({type(e).__name__}): {e}. Menggunakan fallback.") return fallback()
def answer_question(question: str) -> str: def primary(): # Model utama: paling mumpuni response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # Fallback: model lebih ringan dan cepat response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[Respons fallback] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )Pola 5: Antrian Eskalasi
Beberapa kegagalan benar-benar tidak bisa diselesaikan secara otomatis. Antrian eskalasi menangkap tugas yang gagal dengan cukup konteks untuk diselesaikan oleh manusia.
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self)) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[DIESKALASI] Tugas {task_id}: {reason.value}") return recordKapan digunakan: Untuk tugas dengan tindakan tidak dapat dibatalkan atau keputusan kepercayaan rendah. Lihat Pola Multi-Agen untuk agen supervisor.
Kesalahan Umum
Kesalahan 1: Mencoba Ulang Error yang Tidak Bisa Dicoba Ulang
Solusi: Klasifikasikan error sebelum mencoba ulang. Hanya coba ulang error transien.
# Salah: menangkap semuaexcept Exception: retry()
# Benar: hanya error yang bisa dicoba ulangexcept (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raiseKesalahan 2: Kehilangan Status saat Mencoba Ulang
Solusi: Pertahankan riwayat pesan antar percobaan. Coba ulang hanya langkah yang gagal.
Kesalahan 3: Loop Percobaan Ulang Tak Terbatas
Solusi: Selalu tetapkan max_retries. Setelah habis, eskalasikan atau gagal cepat.
Daftar Periksa Produksi
- Semua panggilan API dibungkus dengan exponential backoff (basis 1s, maks 60s, jitter aktif)
- Circuit breaker pada setiap dependensi eksternal
- Checkpoint disimpan ke penyimpanan tahan lama setelah setiap langkah penting
- Jalur fallback diuji dan ditandai dalam metadata respons
- Antrian eskalasi dipantau dan ditinjau setidaknya setiap hari
- Jumlah percobaan ulang dibatasi (
max_retries≤ 5)
Langkah Selanjutnya
- Mulai dengan backoff — Bungkus setiap panggilan API dengan
call_with_backoff. - Tambahkan checkpoint ke alur kerja dengan lebih dari 2–3 langkah.
- Bangun antrian eskalasi untuk tugas dengan tindakan tidak dapat dibatalkan.
- Tulis tes injeksi kegagalan — Uji jalur pemulihan dengan memicu setiap jenis error.
Panduan terkait:
Artikel Terkait
- Pola Penggunaan Alat: Membangun Antarmuka Agen-Alat yang Andal
- Sistem Memori Agen: Memberi Konteks Persisten pada AI Anda