Recupero Errori degli Agenti: 5 Pattern per l'Affidabilità in Produzione
Il tuo agente funzionava perfettamente in test. Poi in produzione ha raggiunto un limite di frequenza al passo 3 di un flusso di lavoro in più fasi, ha lanciato un’eccezione non catturata e ha lasciato il sistema in uno stato indefinito. Nessun checkpoint. Nessun tentativo. Nessun fallback. Solo silenzio — e una pipeline rotta da riavviare manualmente.
Il recupero errori degli agenti è la differenza tra una demo e un sistema in produzione. Questo articolo copre cinque pattern usati nei flussi di lavoro agentici in produzione: backoff esponenziale, interruttori di circuito, checkpoint-e-ripresa, strategie di fallback e code di escalation. Ogni pattern è implementato con l’SDK Anthropic.
Prerequisiti: Devi conoscere Python e come funziona l’API Claude.
Perché gli Agenti Falliscono Diversamente dal Software Tradizionale
Gli agenti falliscono in modi più complessi:
- Progresso parziale: Un agente completa i passi 1–4 di un flusso di 8 passi, poi fallisce. Senza recupero, tutto il progresso va perso.
- Stato ambiguo: L’agente ha chiamato uno strumento, ma la risposta era malformata. L’azione è avvenuta? Si deve riprovare?
- Fallimento a cascata: Una chiamata API lenta blocca l’intero ciclo di ragionamento.
- Fallimento silenzioso: L’LLM restituisce una risposta, ma non segue il formato atteso. Il parser a valle fallisce silenziosamente.
Pattern 1: Backoff Esponenziale con Jitter
Il backoff esponenziale raddoppia il tempo di attesa tra i tentativi. Il jitter aggiunge casualità per evitare che più agenti riprovino contemporaneamente.
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ Chiama l'API Claude con backoff esponenziale sugli errori transitori. Riprova su limiti di frequenza ed errori server; lancia immediatamente su errori client. """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Limite di frequenza. Riprovo in {jitter:.1f}s (tentativo {attempt + 1}/{max_retries})") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Errore server {e.status_code}. Riprovo in {jitter:.1f}s") time.sleep(jitter) except anthropic.APIConnectionError: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) return NoneQuando usare: Per ogni chiamata API all’interno di un ciclo agente. Questo deve essere il tuo wrapper predefinito per tutte le chiamate LLM.
Pattern 2: Interruttore di Circuito
Un interruttore di circuito monitora i tassi di guasto e smette temporaneamente di chiamare un servizio che sta fallendo. Tre stati: Chiuso (normale), Aperto (in guasto) e Semi-aperto (in recupero).
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 recovery_timeout: float = 60.0 success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("Interruttore di circuito APERTO — servizio non disponibile") try: result = func(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPENPattern 3: Checkpoint e Ripresa
Gli agenti a lunga esecuzione devono poter riprendere dall’ultimo passo riuscito dopo un guasto.
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)Questo pattern si integra naturalmente con MemorySaver e PostgresSaver di LangGraph. Vedi Macchine a Stati e Agenti con LangGraph.
Pattern 4: Strategie di Fallback
Alcuni guasti non sono riprovabili. Hai bisogno di un percorso alternativo che mantenga l’agente in movimento.
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """Prova la funzione primaria; ricade sulla secondaria sugli errori specificati.""" try: return primary() except error_types as e: print(f"Primario fallito ({type(e).__name__}): {e}. Uso il fallback.") return fallback()
def answer_question(question: str) -> str: def primary(): # Modello primario: il più capace response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # Fallback: modello più leggero e veloce response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[Risposta di fallback] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )Pattern 5: Coda di Escalation
Alcuni guasti genuinamente non possono essere risolti automaticamente. Una coda di escalation cattura le attività fallite con abbastanza contesto per essere risolte da un umano.
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self)) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[ESCALATO] Attività {task_id}: {reason.value}") return recordQuando usare: Per attività con azioni irreversibili o decisioni a bassa confidenza. Vedi Pattern Multi-Agente.
Errori Comuni
Errore 1: Riprovare Errori Non Riprovabili
La soluzione: Classifica gli errori prima di riprovare.
# Sbagliato: cattura tuttoexcept Exception: retry()
# Giusto: solo errori riprovabiliexcept (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raiseErrore 2: Perdere lo Stato al Nuovo Tentativo
La soluzione: Preserva la cronologia dei messaggi tra i tentativi. Riprova solo il passo specifico fallito.
Errore 3: Cicli di Nuovo Tentativo Illimitati
La soluzione: Imposta sempre max_retries. Dopo l’esaurimento, esegui l’escalation o fallisci rapidamente.
Lista di Controllo per la Produzione
- Tutte le chiamate API avvolte con backoff esponenziale (base 1s, max 60s, jitter attivo)
- Interruttori di circuito su ogni dipendenza esterna
- Checkpoint salvati in storage durevole dopo ogni passo significativo
- Percorsi di fallback testati e marcati nei metadati di risposta
- Coda di escalation monitorata e revisionata almeno quotidianamente
- Conteggi di tentativi limitati (
max_retries≤ 5)
Prossimi Passi
- Inizia con il backoff — Avvolgi ogni chiamata API in
call_with_backoff. - Aggiungi checkpoint a qualsiasi flusso con più di 2–3 passi.
- Costruisci una coda di escalation per attività con azioni irreversibili.
- Scrivi test di iniezione di guasti — Testa i percorsi di recupero attivando deliberatamente ogni tipo di errore.
Guide correlate:
Articoli Correlati
- Pattern di Utilizzo degli Strumenti: Interfacce Agente-Strumento Affidabili
- Sistemi di Memoria per Agenti: Dare Contesto Persistente alla Tua AI