컴퓨터 액세스 시스템

에이전트 오류 복구: 프로덕션 신뢰성을 위한 5가지 패턴


에이전트는 테스트 환경에서 완벽하게 작동했다. 그러다 프로덕션에서 다단계 워크플로우의 3단계에서 속도 제한에 걸렸고, 잡히지 않은 예외를 던지며 시스템을 정의되지 않은 상태로 남겨놨다. 체크포인트도, 재시도도, 폴백도 없이. 그냥 침묵 — 그리고 수동으로 재시작해야 하는 망가진 파이프라인.

에이전트 오류 복구는 데모와 프로덕션 시스템의 차이다. 이 글은 프로덕션 에이전트 워크플로우에서 사용하는 다섯 가지 패턴을 다룬다: 지수 백오프, 서킷 브레이커, 체크포인트-재개, 폴백 전략, 에스컬레이션 큐. 각 패턴은 Anthropic SDK로 구현되어 있다.

전제 조건: Python과 Claude API 작동 방식에 익숙해야 한다.


에이전트가 전통적인 소프트웨어와 다르게 실패하는 이유

에이전트는 더 복잡한 방식으로 실패한다:

  • 부분적 진행: 에이전트가 8단계 워크플로우의 1–4단계를 완료한 후 실패한다. 복구 없이는 모든 진행이 손실된다.
  • 모호한 상태: 에이전트가 도구를 호출했지만 응답이 잘못된 형식이었다. 작업이 실행됐나? 재시도해야 하나?
  • 연쇄 실패: 느린 API 호출 하나가 전체 추론 루프를 막는다.
  • 묵묵한 실패: LLM이 응답을 반환했지만 예상 형식을 따르지 않는다. 다운스트림 파서가 조용히 망가진다.

패턴 1: 지터를 활용한 지수 백오프

지수 백오프는 재시도 사이의 대기 시간을 매번 두 배로 늘린다. 지터는 여러 에이전트가 동시에 재시도하는 것을 막기 위해 무작위성을 추가한다.

import anthropic
import time
import random
from typing import Optional
client = anthropic.Anthropic()
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-6",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Optional[anthropic.types.Message]:
"""
일시적 오류에 지수 백오프로 Claude API를 호출한다.
속도 제한과 서버 오류는 재시도; 클라이언트 오류는 즉시 raise.
"""
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=1024,
messages=messages,
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"속도 제한. {jitter:.1f}초 후 재시도 (시도 {attempt + 1}/{max_retries})")
time.sleep(jitter)
except anthropic.APIStatusError as e:
if e.status_code < 500:
raise
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
print(f"서버 오류 {e.status_code}. {jitter:.1f}초 후 재시도")
time.sleep(jitter)
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
return None

언제 사용하나: 에이전트 루프 내의 모든 API 호출. 모든 LLM 호출의 기본 래퍼가 되어야 한다.


패턴 2: 서킷 브레이커

서킷 브레이커는 장애율을 추적하고 실패하는 서비스 호출을 일시적으로 중단한다. 세 가지 상태: 닫힘 (정상), 열림 (장애), 반열림 (복구 중).

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Any
T = TypeVar("T")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # 열리기 전 연속 실패 횟수
recovery_timeout: float = 60.0 # 반열림 시도 전 대기 초
success_threshold: int = 2 # 반열림에서 닫힘으로 가는 성공 횟수
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
if self.state == CircuitState.OPEN:
raise RuntimeError("서킷 브레이커 열림 — 서비스 불가")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

패턴 3: 체크포인트와 재개

장시간 실행되는 에이전트는 장애 후 마지막 성공한 단계부터 재개할 수 있어야 한다.

import json
import os
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class AgentCheckpoint:
task_id: str
current_step: int
completed_steps: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
def save(self, directory: str = ".checkpoints") -> None:
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, f"{self.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, task_id: str, directory: str = ".checkpoints") -> Optional["AgentCheckpoint"]:
path = os.path.join(directory, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
def clear(self, directory: str = ".checkpoints") -> None:
path = os.path.join(directory, f"{self.task_id}.json")
if os.path.exists(path):
os.remove(path)

이 패턴은 LangGraph의 내장 MemorySaverPostgresSaver와 자연스럽게 통합된다. LangGraph로 상태 머신과 에이전트 구축하기를 참조.


패턴 4: 폴백 전략

일부 장애는 재시도할 수 없다. 에이전트를 계속 앞으로 나아가게 하는 대안 경로가 필요하다.

from typing import Callable
def with_fallback(
primary: Callable[[], str],
fallback: Callable[[], str],
error_types: tuple = (Exception,),
) -> str:
"""기본 함수를 시도하고, 지정된 오류에서 보조로 전환한다."""
try:
return primary()
except error_types as e:
print(f"기본 실패 ({type(e).__name__}): {e}. 폴백 사용.")
return fallback()
def answer_question(question: str) -> str:
def primary():
# 기본 모델: 가장 강력한
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return response.content[0].text
def fallback():
# 폴백: 더 가볍고 빠른 모델
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": question}],
)
return f"[폴백 응답] {response.content[0].text}"
return with_fallback(
primary, fallback,
error_types=(anthropic.RateLimitError, anthropic.APIStatusError),
)

패턴 5: 에스컬레이션 큐

일부 장애는 자동으로 해결할 수 없다. 에스컬레이션 큐는 사람이 해결할 수 있도록 충분한 컨텍스트와 함께 실패한 작업을 기록한다.

import json
import uuid
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
class EscalationReason(Enum):
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
AMBIGUOUS_STATE = "ambiguous_state"
LOW_CONFIDENCE = "low_confidence"
REQUIRES_HUMAN = "requires_human"
IRREVERSIBLE_ACTION = "irreversible_action"
@dataclass
class EscalationRecord:
id: str
task_id: str
reason: str
error_message: str
agent_state: dict
context: str
timestamp: str
resolved: bool = False
def save(self, queue_file: str = "escalation_queue.jsonl") -> None:
with open(queue_file, "a") as f:
f.write(json.dumps(asdict(self), ensure_ascii=False) + "\n")
def escalate(
task_id: str,
reason: EscalationReason,
error_message: str,
agent_state: dict,
context: str = "",
) -> EscalationRecord:
record = EscalationRecord(
id=str(uuid.uuid4()),
task_id=task_id,
reason=reason.value,
error_message=error_message,
agent_state=agent_state,
context=context,
timestamp=datetime.utcnow().isoformat(),
)
record.save()
print(f"[에스컬레이트됨] 작업 {task_id}: {reason.value}")
return record

언제 사용하나: 되돌릴 수 없는 작업이나 낮은 신뢰도 결정을 포함하는 태스크. 에스컬레이션 큐를 처리하는 슈퍼바이저 에이전트는 멀티 에이전트 패턴 참조.


흔한 실수들

실수 1: 재시도 불가능한 오류를 재시도하기

해결책: 재시도 전에 오류를 분류하라. 일시적 오류만 재시도하라.

# 잘못됨: 모든 것을 잡음
except Exception:
retry()
# 올바름: 재시도 가능한 오류만
except (anthropic.RateLimitError, anthropic.APIConnectionError):
retry()
except anthropic.APIStatusError as e:
if e.status_code >= 500:
retry()
else:
raise

실수 2: 재시도 시 상태 잃기

해결책: 재시도 사이에 메시지 히스토리를 보존하라. 실패한 특정 단계만 재시도하라.

실수 3: 무한 재시도 루프

해결책: 항상 max_retries를 설정하라. 소진 후 에스컬레이션하거나 빠르게 실패하라.


프로덕션 배포 체크리스트

  • 모든 API 호출을 지수 백오프로 래핑 (기본 1초, 최대 60초, 지터 활성화)
  • 모든 외부 의존성에 서킷 브레이커
  • 각 중요 단계 후 영구 스토리지에 체크포인트 저장
  • 폴백 경로 테스트 및 응답 메타데이터에 표시
  • 에스컬레이션 큐 최소 매일 모니터링 및 검토
  • 재시도 횟수 제한 (max_retries ≤ 5)

다음 단계

  1. 백오프부터 시작하자 — 모든 API 호출을 call_with_backoff로 래핑.
  2. 체크포인트를 추가하자 — 2–3단계를 넘는 모든 워크플로우에.
  3. 에스컬레이션 큐를 만들자 — 되돌릴 수 없는 작업을 포함하는 태스크용.
  4. 장애 주입 테스트를 작성하자 — 각 오류 유형을 의도적으로 발생시켜 복구 경로 테스트.

관련 가이드:


관련 기사