СИСТЕМА КОМПЬЮТЕРНОГО ДОСТУПА

Восстановление Агентов После Ошибок: 5 Паттернов для Продакшн-Надёжности


Ваш агент отлично работал в тестах. Затем в продакшне он столкнулся с ограничением частоты запросов на шаге 3 многоэтапного рабочего процесса, выбросил необработанное исключение и оставил систему в неопределённом состоянии. Никакого чекпоинта. Никаких повторных попыток. Никакого запасного варианта. Только тишина — и сломанный пайплайн, который нужно перезапускать вручную.

Восстановление агентов после ошибок — это разница между демо и продакшн-системой. В этой статье рассматриваются пять паттернов: экспоненциальный откат, прерыватель цепи, чекпоинт и возобновление, запасные стратегии и очередь эскалации. Каждый паттерн реализован с помощью Anthropic SDK.

Предварительные требования: Знание Python и понимание работы API Claude.


Почему Агенты Отказывают Иначе, чем Традиционный Софт

Агенты отказывают более сложными способами:

  • Частичный прогресс: Агент завершает шаги 1–4 из 8, затем отказывает. Без восстановления весь прогресс теряется.
  • Неоднозначное состояние: Агент вызвал инструмент, но ответ был некорректным. Действие произошло? Нужно повторить?
  • Каскадный отказ: Медленный вызов API блокирует весь цикл рассуждений.
  • Скрытый отказ: LLM возвращает ответ, но не следует ожидаемому формату. Нижестоящий парсер тихо ломается.

Паттерн 1: Экспоненциальный Откат с Джиттером

Экспоненциальный откат удваивает время ожидания между попытками. Джиттер добавляет случайность, предотвращая одновременные повторные попытки нескольких агентов.

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
Вызывает API Claude с экспоненциальным откатом при транзиентных ошибках.
Повторяет при лимитах частоты и ошибках сервера;
немедленно выбрасывает при ошибках клиента.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Лимит запросов. Повтор через {jitter:.1f}с (попытка {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"Ошибка сервера {e.status_code}. Повтор через {jitter:.1f}с")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

Когда использовать: Для любого вызова API внутри цикла агента. Это должна быть обёртка по умолчанию для всех вызовов LLM.


Паттерн 2: Прерыватель Цепи

Прерыватель цепи отслеживает частоту сбоев и временно прекращает вызовы неисправного сервиса. Три состояния: Закрыт (нормальный), Открыт (сбой) и Полуоткрыт (восстановление).

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 60.0
success_threshold: int = 2
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("Прерыватель цепи ОТКРЫТ — сервис недоступен")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

Паттерн 3: Чекпоинт и Возобновление

Долго работающие агенты должны уметь возобновлять работу с последнего успешного шага после сбоя.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

Этот паттерн естественно интегрируется с MemorySaver и PostgresSaver в LangGraph. См. Автоматы состояний и агенты с LangGraph.


Паттерн 4: Запасные Стратегии

Некоторые сбои не поддаются повторным попыткам. Нужен альтернативный путь, удерживающий агента в движении.

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""Пробует основную функцию; переходит к запасной при указанных ошибках."""
try:
return primary()
except error_types as e:
print(f"Основной вариант не сработал ({type(e).__name__}): {e}. Используем запасной.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# Основная модель: наиболее мощная
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# Запасной вариант: более лёгкая и быстрая модель
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[Запасной ответ] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

Паттерн 5: Очередь Эскалации

Некоторые сбои действительно не могут быть устранены автоматически. Очередь эскалации захватывает невыполненные задачи с достаточным контекстом для их разрешения человеком.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[ЭСКАЛИРОВАНО] Задача {task_id}: {reason.value}")
return record

Когда использовать: Для задач с необратимыми действиями или решениями с низкой уверенностью. См. Мультиагентные паттерны.


Распространённые Ошибки

Ошибка 1: Повтор Неповторяемых Ошибок

Решение: Классифицируйте ошибки перед повтором. Повторяйте только транзиентные ошибки.

# Неправильно: перехватывает всё
except Exception:
retry()
# Правильно: только повторяемые ошибки
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

Ошибка 2: Потеря Состояния при Повторе

Решение: Сохраняйте историю сообщений между попытками. Повторяйте только конкретный неудавшийся шаг.

Ошибка 3: Неограниченные Циклы Повторов

Решение: Всегда устанавливайте max_retries. После исчерпания — эскалируйте или быстро завершайте.


Контрольный Список для Продакшна

  • Все API-вызовы обёрнуты экспоненциальным откатом (база 1с, макс 60с, джиттер включён)
  • Прерыватели цепи на каждой внешней зависимости
  • Чекпоинты сохраняются в долговременное хранилище после каждого важного шага
  • Запасные пути протестированы и помечены в метаданных ответа
  • Очередь эскалации отслеживается и проверяется не реже раза в день
  • Счётчики повторов ограничены (max_retries ≤ 5)

Следующие Шаги

  1. Начните с отката — Оберните каждый API-вызов в call_with_backoff.
  2. Добавьте чекпоинты к любому рабочему процессу длиннее 2–3 шагов.
  3. Создайте очередь эскалации для задач с необратимыми действиями.
  4. Пишите тесты с инжекцией ошибок — Тестируйте пути восстановления, намеренно вызывая каждый тип ошибки.

Связанные руководства:


Связанные статьи