SISTEM AKSES KOMPUTER

Pemulihan Error Agen: 5 Pola untuk Keandalan Produksi


Agenmu bekerja sempurna di lingkungan pengujian. Lalu di produksi, ia mencapai batas rate di langkah 3 dari alur kerja multi-langkah, melempar exception yang tidak tertangani, dan meninggalkan sistemmu dalam keadaan tidak terdefinisi. Tidak ada checkpoint. Tidak ada percobaan ulang. Tidak ada fallback. Hanya keheningan — dan pipeline yang rusak yang harus di-restart secara manual.

Pemulihan error agen adalah perbedaan antara demo dan sistem produksi. Artikel ini mencakup lima pola yang digunakan dalam alur kerja agentik produksi: exponential backoff, circuit breaker, checkpoint-and-resume, strategi fallback, dan antrian eskalasi. Setiap pola diimplementasikan dengan Anthropic SDK.

Prasyarat: Kamu perlu familiar dengan Python dan cara kerja API Claude.


Mengapa Agen Gagal Berbeda dari Perangkat Lunak Tradisional

Agen gagal dengan cara yang lebih kompleks:

  • Kemajuan parsial: Agen menyelesaikan langkah 1–4 dari alur 8 langkah, lalu gagal. Tanpa pemulihan, semua kemajuan hilang.
  • Status ambigu: Agen memanggil sebuah alat, tapi responsnya malformed. Apakah tindakan terjadi? Haruskah mencoba ulang?
  • Kegagalan berantai: Satu panggilan API yang lambat memblokir seluruh loop penalaran.
  • Kegagalan diam: LLM mengembalikan respons, tapi tidak mengikuti format yang diharapkan. Parser downstream gagal secara diam-diam.

Pola 1: Exponential Backoff dengan Jitter

Exponential backoff menggandakan waktu tunggu antar percobaan ulang. Jitter menambahkan keacakan untuk mencegah beberapa agen mencoba ulang secara bersamaan.

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
Memanggil API Claude dengan exponential backoff pada error transien.
Mencoba ulang pada rate limit dan error server;
langsung raise pada error klien.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Rate limit. Mencoba ulang dalam {jitter:.1f}s (percobaan {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Error server {e.status_code}. Mencoba ulang dalam {jitter:.1f}s")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

Kapan digunakan: Untuk setiap panggilan API di dalam loop agen. Ini harus menjadi wrapper default untuk semua panggilan LLM.


Pola 2: Circuit Breaker

Circuit breaker melacak tingkat kegagalan dan sementara berhenti memanggil layanan yang gagal. Tiga status: Tertutup (normal), Terbuka (gagal), dan Setengah terbuka (memulihkan).

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 60.0
success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("Circuit breaker TERBUKA — layanan tidak tersedia")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

Pola 3: Checkpoint dan Lanjutkan

Agen yang berjalan lama perlu dapat melanjutkan dari langkah terakhir yang berhasil setelah kegagalan.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

Pola ini terintegrasi secara alami dengan MemorySaver dan PostgresSaver bawaan LangGraph. Lihat Mesin Status dan Agen dengan LangGraph.


Pola 4: Strategi Fallback

Beberapa kegagalan tidak dapat dicoba ulang. Kamu butuh jalur alternatif yang membuat agen tetap bergerak.

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""Mencoba fungsi utama; beralih ke sekunder pada error yang ditentukan."""
try:
return primary()
except error_types as e:
print(f"Utama gagal ({type(e).__name__}): {e}. Menggunakan fallback.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# Model utama: paling mumpuni
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# Fallback: model lebih ringan dan cepat
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[Respons fallback] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

Pola 5: Antrian Eskalasi

Beberapa kegagalan benar-benar tidak bisa diselesaikan secara otomatis. Antrian eskalasi menangkap tugas yang gagal dengan cukup konteks untuk diselesaikan oleh manusia.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self)) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[DIESKALASI] Tugas {task_id}: {reason.value}")
return record

Kapan digunakan: Untuk tugas dengan tindakan tidak dapat dibatalkan atau keputusan kepercayaan rendah. Lihat Pola Multi-Agen untuk agen supervisor.


Kesalahan Umum

Kesalahan 1: Mencoba Ulang Error yang Tidak Bisa Dicoba Ulang

Solusi: Klasifikasikan error sebelum mencoba ulang. Hanya coba ulang error transien.

# Salah: menangkap semua
except Exception:
retry()
# Benar: hanya error yang bisa dicoba ulang
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

Kesalahan 2: Kehilangan Status saat Mencoba Ulang

Solusi: Pertahankan riwayat pesan antar percobaan. Coba ulang hanya langkah yang gagal.

Kesalahan 3: Loop Percobaan Ulang Tak Terbatas

Solusi: Selalu tetapkan max_retries. Setelah habis, eskalasikan atau gagal cepat.


Daftar Periksa Produksi

  • Semua panggilan API dibungkus dengan exponential backoff (basis 1s, maks 60s, jitter aktif)
  • Circuit breaker pada setiap dependensi eksternal
  • Checkpoint disimpan ke penyimpanan tahan lama setelah setiap langkah penting
  • Jalur fallback diuji dan ditandai dalam metadata respons
  • Antrian eskalasi dipantau dan ditinjau setidaknya setiap hari
  • Jumlah percobaan ulang dibatasi (max_retries ≤ 5)

Langkah Selanjutnya

  1. Mulai dengan backoff — Bungkus setiap panggilan API dengan call_with_backoff.
  2. Tambahkan checkpoint ke alur kerja dengan lebih dari 2–3 langkah.
  3. Bangun antrian eskalasi untuk tugas dengan tindakan tidak dapat dibatalkan.
  4. Tulis tes injeksi kegagalan — Uji jalur pemulihan dengan memicu setiap jenis error.

Panduan terkait:


Artikel Terkait