SISTEMA DI ACCESSO COMPUTER

Recupero Errori degli Agenti: 5 Pattern per l'Affidabilità in Produzione


Il tuo agente funzionava perfettamente in test. Poi in produzione ha raggiunto un limite di frequenza al passo 3 di un flusso di lavoro in più fasi, ha lanciato un’eccezione non catturata e ha lasciato il sistema in uno stato indefinito. Nessun checkpoint. Nessun tentativo. Nessun fallback. Solo silenzio — e una pipeline rotta da riavviare manualmente.

Il recupero errori degli agenti è la differenza tra una demo e un sistema in produzione. Questo articolo copre cinque pattern usati nei flussi di lavoro agentici in produzione: backoff esponenziale, interruttori di circuito, checkpoint-e-ripresa, strategie di fallback e code di escalation. Ogni pattern è implementato con l’SDK Anthropic.

Prerequisiti: Devi conoscere Python e come funziona l’API Claude.


Perché gli Agenti Falliscono Diversamente dal Software Tradizionale

Gli agenti falliscono in modi più complessi:

  • Progresso parziale: Un agente completa i passi 1–4 di un flusso di 8 passi, poi fallisce. Senza recupero, tutto il progresso va perso.
  • Stato ambiguo: L’agente ha chiamato uno strumento, ma la risposta era malformata. L’azione è avvenuta? Si deve riprovare?
  • Fallimento a cascata: Una chiamata API lenta blocca l’intero ciclo di ragionamento.
  • Fallimento silenzioso: L’LLM restituisce una risposta, ma non segue il formato atteso. Il parser a valle fallisce silenziosamente.

Pattern 1: Backoff Esponenziale con Jitter

Il backoff esponenziale raddoppia il tempo di attesa tra i tentativi. Il jitter aggiunge casualità per evitare che più agenti riprovino contemporaneamente.

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
Chiama l'API Claude con backoff esponenziale sugli errori transitori.
Riprova su limiti di frequenza ed errori server;
lancia immediatamente su errori client.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Limite di frequenza. Riprovo in {jitter:.1f}s (tentativo {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Errore server {e.status_code}. Riprovo in {jitter:.1f}s")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

Quando usare: Per ogni chiamata API all’interno di un ciclo agente. Questo deve essere il tuo wrapper predefinito per tutte le chiamate LLM.


Pattern 2: Interruttore di Circuito

Un interruttore di circuito monitora i tassi di guasto e smette temporaneamente di chiamare un servizio che sta fallendo. Tre stati: Chiuso (normale), Aperto (in guasto) e Semi-aperto (in recupero).

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 60.0
success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("Interruttore di circuito APERTO — servizio non disponibile")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

Pattern 3: Checkpoint e Ripresa

Gli agenti a lunga esecuzione devono poter riprendere dall’ultimo passo riuscito dopo un guasto.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

Questo pattern si integra naturalmente con MemorySaver e PostgresSaver di LangGraph. Vedi Macchine a Stati e Agenti con LangGraph.


Pattern 4: Strategie di Fallback

Alcuni guasti non sono riprovabili. Hai bisogno di un percorso alternativo che mantenga l’agente in movimento.

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""Prova la funzione primaria; ricade sulla secondaria sugli errori specificati."""
try:
return primary()
except error_types as e:
print(f"Primario fallito ({type(e).__name__}): {e}. Uso il fallback.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# Modello primario: il più capace
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# Fallback: modello più leggero e veloce
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[Risposta di fallback] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

Pattern 5: Coda di Escalation

Alcuni guasti genuinamente non possono essere risolti automaticamente. Una coda di escalation cattura le attività fallite con abbastanza contesto per essere risolte da un umano.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self)) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[ESCALATO] Attività {task_id}: {reason.value}")
return record

Quando usare: Per attività con azioni irreversibili o decisioni a bassa confidenza. Vedi Pattern Multi-Agente.


Errori Comuni

Errore 1: Riprovare Errori Non Riprovabili

La soluzione: Classifica gli errori prima di riprovare.

# Sbagliato: cattura tutto
except Exception:
retry()
# Giusto: solo errori riprovabili
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

Errore 2: Perdere lo Stato al Nuovo Tentativo

La soluzione: Preserva la cronologia dei messaggi tra i tentativi. Riprova solo il passo specifico fallito.

Errore 3: Cicli di Nuovo Tentativo Illimitati

La soluzione: Imposta sempre max_retries. Dopo l’esaurimento, esegui l’escalation o fallisci rapidamente.


Lista di Controllo per la Produzione

  • Tutte le chiamate API avvolte con backoff esponenziale (base 1s, max 60s, jitter attivo)
  • Interruttori di circuito su ogni dipendenza esterna
  • Checkpoint salvati in storage durevole dopo ogni passo significativo
  • Percorsi di fallback testati e marcati nei metadati di risposta
  • Coda di escalation monitorata e revisionata almeno quotidianamente
  • Conteggi di tentativi limitati (max_retries ≤ 5)

Prossimi Passi

  1. Inizia con il backoff — Avvolgi ogni chiamata API in call_with_backoff.
  2. Aggiungi checkpoint a qualsiasi flusso con più di 2–3 passi.
  3. Costruisci una coda di escalation per attività con azioni irreversibili.
  4. Scrivi test di iniezione di guasti — Testa i percorsi di recupero attivando deliberatamente ogni tipo di errore.

Guide correlate:


Articoli Correlati