Восстановление Агентов После Ошибок: 5 Паттернов для Продакшн-Надёжности
Ваш агент отлично работал в тестах. Затем в продакшне он столкнулся с ограничением частоты запросов на шаге 3 многоэтапного рабочего процесса, выбросил необработанное исключение и оставил систему в неопределённом состоянии. Никакого чекпоинта. Никаких повторных попыток. Никакого запасного варианта. Только тишина — и сломанный пайплайн, который нужно перезапускать вручную.
Восстановление агентов после ошибок — это разница между демо и продакшн-системой. В этой статье рассматриваются пять паттернов: экспоненциальный откат, прерыватель цепи, чекпоинт и возобновление, запасные стратегии и очередь эскалации. Каждый паттерн реализован с помощью Anthropic SDK.
Предварительные требования: Знание Python и понимание работы API Claude.
Почему Агенты Отказывают Иначе, чем Традиционный Софт
Агенты отказывают более сложными способами:
- Частичный прогресс: Агент завершает шаги 1–4 из 8, затем отказывает. Без восстановления весь прогресс теряется.
- Неоднозначное состояние: Агент вызвал инструмент, но ответ был некорректным. Действие произошло? Нужно повторить?
- Каскадный отказ: Медленный вызов API блокирует весь цикл рассуждений.
- Скрытый отказ: LLM возвращает ответ, но не следует ожидаемому формату. Нижестоящий парсер тихо ломается.
Паттерн 1: Экспоненциальный Откат с Джиттером
Экспоненциальный откат удваивает время ожидания между попытками. Джиттер добавляет случайность, предотвращая одновременные повторные попытки нескольких агентов.
import anthropicimport timeimport randomfrom typing import Optional
client = anthropic.Anthropic()
def call_with_backoff( messages: list, model: str = "claude-sonnet-4-6", max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0,) -> Optional[anthropic.types.Message]: """ Вызывает API Claude с экспоненциальным откатом при транзиентных ошибках. Повторяет при лимитах частоты и ошибках сервера; немедленно выбрасывает при ошибках клиента. """ for attempt in range(max_retries): try: return client.messages.create( model=model, max_tokens=1024, messages=messages, ) except anthropic.RateLimitError as e: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Лимит запросов. Повтор через {jitter:.1f}с (попытка {attempt + 1}/{max_retries})") time.sleep(jitter) except anthropic.APIStatusError as e: if e.status_code < 500: raise if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) print(f"Ошибка сервера {e.status_code}. Повтор через {jitter:.1f}с") time.sleep(jitter) except anthropic.APIConnectionError: if attempt == max_retries - 1: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) return NoneКогда использовать: Для любого вызова API внутри цикла агента. Это должна быть обёртка по умолчанию для всех вызовов LLM.
Паттерн 2: Прерыватель Цепи
Прерыватель цепи отслеживает частоту сбоев и временно прекращает вызовы неисправного сервиса. Три состояния: Закрыт (нормальный), Открыт (сбой) и Полуоткрыт (восстановление).
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"
@dataclassclass CircuitBreaker: failure_threshold: int = 5 recovery_timeout: float = 60.0 success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False) _failure_count: int = field(default=0, init=False) _success_count: int = field(default=0, init=False) _last_failure_time: float = field(default=0.0, init=False)
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: if time.time() - self._last_failure_time > self.recovery_timeout: self._state = CircuitState.HALF_OPEN self._success_count = 0 return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: if self.state == CircuitState.OPEN: raise RuntimeError("Прерыватель цепи ОТКРЫТ — сервис недоступен") try: result = func(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise
def _on_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED
def _on_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPENПаттерн 3: Чекпоинт и Возобновление
Долго работающие агенты должны уметь возобновлять работу с последнего успешного шага после сбоя.
import jsonimport osfrom dataclasses import dataclass, asdict, fieldfrom typing import Optional
@dataclassclass AgentCheckpoint: task_id: str current_step: int completed_steps: list[str] = field(default_factory=list) results: dict = field(default_factory=dict) messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None: os.makedirs(directory, exist_ok=True) path = os.path.join(directory, f"{self.task_id}.json") with open(path, "w") as f: json.dump(asdict(self), f, indent=2)
@classmethod def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]: path = os.path.join(directory, f"{task_id}.json") if not os.path.exists(path): return None with open(path) as f: data = json.load(f) return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None: path = os.path.join(directory, f"{self.task_id}.json") if os.path.exists(path): os.remove(path)Этот паттерн естественно интегрируется с MemorySaver и PostgresSaver в LangGraph. См. Автоматы состояний и агенты с LangGraph.
Паттерн 4: Запасные Стратегии
Некоторые сбои не поддаются повторным попыткам. Нужен альтернативный путь, удерживающий агента в движении.
from typing import Callable
def with_fallback( primary: Callable[[], str], fallback: Callable[[], str], error_types: tuple = (Exception,),) -> str: """Пробует основную функцию; переходит к запасной при указанных ошибках.""" try: return primary() except error_types as e: print(f"Основной вариант не сработал ({type(e).__name__}): {e}. Используем запасной.") return fallback()
def answer_question(question: str) -> str: def primary(): # Основная модель: наиболее мощная response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": question}], ) return response.content[0].text
def fallback(): # Запасной вариант: более лёгкая и быстрая модель response = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": question}], ) return f"[Запасной ответ] {response.content[0].text}"
return with_fallback( primary, fallback, error_types=(anthropic.RateLimitError, anthropic.APIStatusError), )Паттерн 5: Очередь Эскалации
Некоторые сбои действительно не могут быть устранены автоматически. Очередь эскалации захватывает невыполненные задачи с достаточным контекстом для их разрешения человеком.
import jsonimport uuidfrom datetime import datetimefrom dataclasses import dataclass, asdictfrom enum import Enum
class EscalationReason(Enum): MAX_RETRIES_EXCEEDED = "max_retries_exceeded" AMBIGUOUS_STATE = "ambiguous_state" LOW_CONFIDENCE = "low_confidence" REQUIRES_HUMAN = "requires_human" IRREVERSIBLE_ACTION = "irreversible_action"
@dataclassclass EscalationRecord: id: str task_id: str reason: str error_message: str agent_state: dict context: str timestamp: str resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None: with open(queue_file, "a") as f: f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate( task_id: str, reason: EscalationReason, error_message: str, agent_state: dict, context: str = "",) -> EscalationRecord: record = EscalationRecord( id=str(uuid.uuid4()), task_id=task_id, reason=reason.value, error_message=error_message, agent_state=agent_state, context=context, timestamp=datetime.utcnow().isoformat(), ) record.save() print(f"[ЭСКАЛИРОВАНО] Задача {task_id}: {reason.value}") return recordКогда использовать: Для задач с необратимыми действиями или решениями с низкой уверенностью. См. Мультиагентные паттерны.
Распространённые Ошибки
Ошибка 1: Повтор Неповторяемых Ошибок
Решение: Классифицируйте ошибки перед повтором. Повторяйте только транзиентные ошибки.
# Неправильно: перехватывает всёexcept Exception: retry()
# Правильно: только повторяемые ошибкиexcept (anthropic.RateLimitError, anthropic.APIConnectionError): retry()except anthropic.APIStatusError as e: if e.status_code >= 500: retry() else: raiseОшибка 2: Потеря Состояния при Повторе
Решение: Сохраняйте историю сообщений между попытками. Повторяйте только конкретный неудавшийся шаг.
Ошибка 3: Неограниченные Циклы Повторов
Решение: Всегда устанавливайте max_retries. После исчерпания — эскалируйте или быстро завершайте.
Контрольный Список для Продакшна
- Все API-вызовы обёрнуты экспоненциальным откатом (база 1с, макс 60с, джиттер включён)
- Прерыватели цепи на каждой внешней зависимости
- Чекпоинты сохраняются в долговременное хранилище после каждого важного шага
- Запасные пути протестированы и помечены в метаданных ответа
- Очередь эскалации отслеживается и проверяется не реже раза в день
- Счётчики повторов ограничены (
max_retries≤ 5)
Следующие Шаги
- Начните с отката — Оберните каждый API-вызов в
call_with_backoff. - Добавьте чекпоинты к любому рабочему процессу длиннее 2–3 шагов.
- Создайте очередь эскалации для задач с необратимыми действиями.
- Пишите тесты с инжекцией ошибок — Тестируйте пути восстановления, намеренно вызывая каждый тип ошибки.
Связанные руководства:
Связанные статьи
- Паттерны использования инструментов: надёжные интерфейсы агент-инструмент
- Системы памяти агентов: постоянный контекст для вашего ИИ