COMPUTERZUGRIFFSSYSTEM

Agent-Fehlerbehandlung: 5 Muster für Produktionszuverlässigkeit


Dein Agent hat in der Testumgebung perfekt funktioniert. Dann stieß er in der Produktion beim Schritt 3 eines mehrstufigen Workflows auf ein Rate-Limit, warf eine nicht abgefangene Ausnahme und ließ dein System in einem undefinierten Zustand. Kein Checkpoint. Kein Wiederholungsversuch. Kein Fallback. Nur Stille — und eine kaputte Pipeline, die manuell neu gestartet werden muss.

Fehlerbehandlung bei Agenten ist der Unterschied zwischen einer Demo und einem Produktionssystem. Dieser Artikel deckt fünf Muster ab: exponentielles Backoff, Sicherungsautomaten, Checkpoint-und-Fortsetzen, Fallback-Strategien und Eskalations-Warteschlangen. Jedes Muster ist mit dem Anthropic SDK implementiert.

Voraussetzungen: Du solltest Python kennen und wissen, wie die Claude API funktioniert.


Warum Agenten Anders Scheitern als Traditionelle Software

Agenten scheitern auf komplexere Weisen:

  • Teilfortschritt: Ein Agent schließt die Schritte 1–4 eines 8-Schritt-Workflows ab, scheitert dann. Ohne Wiederherstellung geht aller Fortschritt verloren.
  • Mehrdeutiger Zustand: Der Agent rief ein Tool auf, aber die Antwort war fehlerhaft. Hat die Aktion stattgefunden? Soll wiederholt werden?
  • Kaskadierende Ausfälle: Ein langsamer API-Aufruf blockiert die gesamte Reasoning-Schleife.
  • Stille Fehler: Das LLM gibt eine Antwort zurück, folgt aber nicht dem erwarteten Format. Der nachgelagerte Parser scheitert still.

Muster 1: Exponentielles Backoff mit Jitter

Exponentielles Backoff verdoppelt die Wartezeit zwischen Wiederholungsversuchen. Jitter fügt Zufälligkeit hinzu, um zu verhindern, dass mehrere Agenten gleichzeitig wiederholen.

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
Ruft die Claude API mit exponentiellem Backoff bei transienten Fehlern auf.
Wiederholt bei Rate-Limits und Server-Fehlern; löst sofort bei Client-Fehlern aus.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Rate-Limit. Wiederhole in {jitter:.1f}s (Versuch {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Serverfehler {e.status_code}. Wiederhole in {jitter:.1f}s")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

Wann verwenden: Bei jedem API-Aufruf innerhalb einer Agenten-Schleife. Dies sollte dein Standard-Wrapper für alle LLM-Aufrufe sein.


Muster 2: Sicherungsautomat

Ein Sicherungsautomat verfolgt Fehlerraten und stoppt vorübergehend Aufrufe an einen fehlschlagenden Dienst. Drei Zustände: Geschlossen (normal), Offen (fehlerhaft) und Halb-offen (Wiederherstellung).

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 60.0
success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("Sicherungsautomat OFFEN — Dienst nicht verfügbar")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

Muster 3: Checkpoint und Fortsetzen

Lang laufende Agenten müssen nach einem Ausfall ab dem letzten erfolgreichen Schritt fortsetzen können.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

Dieses Muster integriert sich natürlich mit LangGraphs eingebautem MemorySaver und PostgresSaver. Siehe Zustandsmaschinen und Agenten mit LangGraph.


Muster 4: Fallback-Strategien

Manche Fehler sind nicht wiederholbar. Du brauchst einen alternativen Pfad, der den Agenten weiterbewegt.

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""Versucht die primäre Funktion; Fallback bei angegebenen Fehlern."""
try:
return primary()
except error_types as e:
print(f"Primär fehlgeschlagen ({type(e).__name__}): {e}. Verwende Fallback.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# Primäres Modell: das leistungsfähigste
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# Fallback: leichteres, schnelleres Modell
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[Fallback-Antwort] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

Muster 5: Eskalations-Warteschlange

Manche Fehler können nicht automatisch behoben werden. Eine Eskalations-Warteschlange erfasst fehlgeschlagene Aufgaben mit genug Kontext, damit ein Mensch sie lösen kann.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self)) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[ESKALIERT] Aufgabe {task_id}: {reason.value}")
return record

Wann verwenden: Für Aufgaben mit unumkehrbaren Aktionen oder Entscheidungen mit geringer Konfidenz. Siehe Multi-Agenten-Muster für einen Supervisor-Agenten.


Häufige Fehler

Fehler 1: Nicht-wiederholbare Fehler wiederholen

Die Lösung: Klassifiziere Fehler vor dem Wiederholen. Nur transiente Fehler wiederholen.

# Falsch: fängt alles ab
except Exception:
retry()
# Richtig: nur wiederholbare Fehler
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

Fehler 2: Zustand bei Wiederholung verlieren

Die Lösung: Nachrichtenverlauf zwischen Versuchen bewahren. Nur den spezifisch fehlgeschlagenen Schritt wiederholen.

Fehler 3: Unbegrenzte Wiederholungsschleifen

Die Lösung: Immer max_retries setzen. Nach Erschöpfung eskalieren oder schnell scheitern.


Produktions-Checkliste

  • Alle API-Aufrufe mit exponentiellem Backoff umhüllt (Basis 1s, max 60s, Jitter aktiviert)
  • Sicherungsautomaten bei jeder externen Abhängigkeit
  • Checkpoints nach jedem wichtigen Schritt in dauerhaftem Speicher gespeichert
  • Fallback-Pfade getestet und in Antwort-Metadaten markiert
  • Eskalations-Warteschlange täglich überwacht und überprüft
  • Wiederholungsanzahl begrenzt (max_retries ≤ 5)

Nächste Schritte

  1. Mit Backoff beginnen — Jeden API-Aufruf in call_with_backoff einwickeln.
  2. Checkpoints hinzufügen zu jedem Workflow mit mehr als 2–3 Schritten.
  3. Eskalations-Warteschlange aufbauen für Aufgaben mit unumkehrbaren Aktionen.
  4. Fehlerinjektionstests schreiben — Wiederherstellungspfade durch absichtliches Auslösen jedes Fehlertyps testen.

Verwandte Anleitungen:


Verwandte Artikel