计算机访问系统

智能体错误恢复:5种生产环境可靠性模式


你的智能体在测试中完美运行。然后在生产环境中,它在多步骤工作流的第3步触发了速率限制,抛出了未捕获的异常,并让系统处于未定义状态。没有检查点,没有重试,没有备用方案。只有沉默——以及一个必须手动重启的损坏管道。

智能体错误恢复是演示与生产系统之间的差距。本文涵盖生产智能体工作流中使用的五种模式:指数退避、熔断器、检查点与恢复、降级策略以及升级队列。每种模式均使用Anthropic SDK实现,兼容任何编排框架。

前提条件: 你需要熟悉Python和Claude API的工作原理。


为什么智能体的失败与传统软件不同

智能体以更复杂的方式失败:

  • 部分进度: 智能体完成8步工作流中的1–4步后失败。没有恢复机制就会丢失所有进度。
  • 状态模糊: 智能体调用了工具,但响应格式错误。操作是否执行了?应该重试吗?
  • 级联故障: 一个慢速API调用阻塞了整个推理循环。
  • 静默故障: LLM返回了响应,但不符合预期格式,下游解析器静默失败。

模式1:带抖动的指数退避

指数退避在每次重试之间将等待时间翻倍。抖动添加随机性,防止多个智能体同时重试(“惊群效应”问题)。

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
使用指数退避调用Claude API处理瞬时错误。
对速率限制和服务器错误重试;对客户端错误立即抛出。
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"速率限制。{jitter:.1f}秒后重试(第{attempt + 1}/{max_retries}次)")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"服务器错误{e.status_code}{jitter:.1f}秒后重试")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

使用场景: 智能体循环中的任何API调用。这应该是所有LLM调用的默认包装器。

不使用的场景: 不要对验证错误(400)、认证失败(401)或404重试。


模式2:熔断器

熔断器跟踪故障率,并暂时停止调用失败的服务。它有三个状态:关闭(正常)、打开(故障)和半开(恢复中)。

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # 触发打开的连续失败次数
recovery_timeout: float = 60.0 # 尝试半开前的等待秒数
success_threshold: int = 2 # 半开状态下关闭所需成功次数
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("熔断器已打开——服务不可用")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

模式3:检查点与恢复

长时间运行的智能体需要在故障后从最后成功的步骤恢复。该模式在每个步骤后将智能体状态序列化到持久存储。

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

该模式与LangGraph内置的MemorySaverPostgresSaver检查点器自然集成。参见状态机与智能体:使用LangGraph构建可靠工作流


模式4:降级策略

某些故障不可重试。你需要一条备用路径让智能体继续前进。

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""尝试主函数;在指定错误时回退到备用函数。"""
try:
return primary()
except error_types as e:
print(f"主选项失败({type(e).__name__}):{e}。使用备用方案。")
return fallback()
def answer_question(question: str) -> str:
def primary():
# 主模型:最强大
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# 备用:更快更轻量的模型
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[备用响应] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

模式5:升级队列

某些故障真的无法自动解决。升级队列捕获失败的任务及足够的上下文,供人工(或监督智能体)解决。

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[已升级] 任务{task_id}{reason.value}")
return record

使用场景: 涉及不可逆操作、低置信度决策的任务。参见多智能体模式了解处理升级队列的监督智能体。


常见陷阱

陷阱1:对不可重试错误进行重试

解决方案: 在重试前对错误进行分类。只重试瞬时错误。

# 错误:捕获所有异常
except Exception:
retry()
# 正确:只重试可重试错误
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

陷阱2:重试时丢失状态

解决方案: 在重试之间保留消息历史。只重试具体失败的步骤。

陷阱3:无界重试循环

解决方案: 始终设置max_retries。耗尽后升级或快速失败。


生产部署清单

  • 所有API调用使用指数退避包装(基础1秒,最大60秒,启用抖动)
  • 每个外部依赖项配置熔断器
  • 每个重要步骤后将检查点保存到持久存储
  • 降级路径已测试并在响应元数据中标记
  • 升级队列每日监控和审查
  • 重试次数有界(max_retries ≤ 5)

下一步

  1. 从退避开始 — 用call_with_backoff包装每个API调用。
  2. 添加检查点 到任何超过2–3步的工作流。
  3. 构建升级队列 用于涉及不可逆操作的任务。
  4. 编写故障注入测试 — 通过主动触发每种错误类型来测试恢复路径。

相关指南:


相关文章