智能体错误恢复:5种生产环境可靠性模式
你的智能体在测试中完美运行。然后在生产环境中,它在多步骤工作流的第3步触发了速率限制,抛出了未捕获的异常,并让系统处于未定义状态。没有检查点,没有重试,没有备用方案。只有沉默——以及一个必须手动重启的损坏管道。
智能体错误恢复是演示与生产系统之间的差距。本文涵盖生产智能体工作流中使用的五种模式:指数退避、熔断器、检查点与恢复、降级策略以及升级队列。每种模式均使用Anthropic SDK实现,兼容任何编排框架。
前提条件: 你需要熟悉Python和Claude API的工作原理。
为什么智能体的失败与传统软件不同
智能体以更复杂的方式失败:
- 部分进度: 智能体完成8步工作流中的1–4步后失败。没有恢复机制就会丢失所有进度。
- 状态模糊: 智能体调用了工具,但响应格式错误。操作是否执行了?应该重试吗?
- 级联故障: 一个慢速API调用阻塞了整个推理循环。
- 静默故障: LLM返回了响应,但不符合预期格式,下游解析器静默失败。
模式1:带抖动的指数退避
指数退避在每次重试之间将等待时间翻倍。抖动添加随机性,防止多个智能体同时重试(“惊群效应”问题)。
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ 使用指数退避调用Claude API处理瞬时错误。 对速率限制和服务器错误重试;对客户端错误立即抛出。 """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"速率限制。{jitter:.1f}秒后重试(第{attempt + 1}/{max_retries}次)") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"服务器错误{e.status_code}。{jitter:.1f}秒后重试") time.sleep(jitter) except anthropic.APIConnectionError: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) return None使用场景: 智能体循环中的任何API调用。这应该是所有LLM调用的默认包装器。
不使用的场景: 不要对验证错误(400)、认证失败(401)或404重试。
模式2:熔断器
熔断器跟踪故障率,并暂时停止调用失败的服务。它有三个状态:关闭(正常)、打开(故障)和半开(恢复中)。
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 # 触发打开的连续失败次数 recovery_timeout: float = 60.0 # 尝试半开前的等待秒数 success_threshold: int = 2 # 半开状态下关闭所需成功次数
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("熔断器已打开——服务不可用") try: result = func(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPEN模式3:检查点与恢复
长时间运行的智能体需要在故障后从最后成功的步骤恢复。该模式在每个步骤后将智能体状态序列化到持久存储。
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)该模式与LangGraph内置的MemorySaver和PostgresSaver检查点器自然集成。参见状态机与智能体:使用LangGraph构建可靠工作流。
模式4:降级策略
某些故障不可重试。你需要一条备用路径让智能体继续前进。
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """尝试主函数;在指定错误时回退到备用函数。""" try: return primary() except error_types as e: print(f"主选项失败({type(e).__name__}):{e}。使用备用方案。") return fallback()
def answer_question(question: str) -> str: def primary(): # 主模型:最强大 response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # 备用:更快更轻量的模型 response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[备用响应] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )模式5:升级队列
某些故障真的无法自动解决。升级队列捕获失败的任务及足够的上下文,供人工(或监督智能体)解决。
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[已升级] 任务{task_id}:{reason.value}") return record使用场景: 涉及不可逆操作、低置信度决策的任务。参见多智能体模式了解处理升级队列的监督智能体。
常见陷阱
陷阱1:对不可重试错误进行重试
解决方案: 在重试前对错误进行分类。只重试瞬时错误。
# 错误:捕获所有异常except Exception: retry()
# 正确:只重试可重试错误except (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raise陷阱2:重试时丢失状态
解决方案: 在重试之间保留消息历史。只重试具体失败的步骤。
陷阱3:无界重试循环
解决方案: 始终设置max_retries。耗尽后升级或快速失败。
生产部署清单
- 所有API调用使用指数退避包装(基础1秒,最大60秒,启用抖动)
- 每个外部依赖项配置熔断器
- 每个重要步骤后将检查点保存到持久存储
- 降级路径已测试并在响应元数据中标记
- 升级队列每日监控和审查
- 重试次数有界(
max_retries≤ 5)
下一步
- 从退避开始 — 用
call_with_backoff包装每个API调用。 - 添加检查点 到任何超过2–3步的工作流。
- 构建升级队列 用于涉及不可逆操作的任务。
- 编写故障注入测试 — 通过主动触发每种错误类型来测试恢复路径。
相关指南: