컴퓨터 액세스 시스템

멀티에이전트 패턴: 오케스트레이터, 워커, 파이프라인


단일 에이전트는 범위가 명확한 작업에 적합합니다. 작업이 여러 도메인에 걸친 전문 지식, 대용량 입력에 대한 병렬 작업, 또는 실행 전 검증이 필요한 결정을 요구하는 순간, 여러 에이전트가 필요합니다.

멀티에이전트 시스템이 본질적으로 더 복잡한 것은 아닙니다. 단지 다르게 구조화되어 있을 뿐입니다. 핵심은 문제에 맞는 올바른 패턴을 선택하는 것입니다. 세 가지 패턴이 대부분의 사용 사례를 커버합니다: 오케스트레이터-워커, 파이프라인, 병렬 팬아웃.

여러 에이전트가 필요한 이유

멀티에이전트 시스템의 근거는 세 가지 실용적인 이유로 귀결됩니다.

전문화. 컨텍스트에 50개의 도구를 가진 단일 에이전트는 혼란스러워집니다. 5개의 집중된 도구를 가진 전문 에이전트가 더 잘 수행합니다. 도메인별로 분리하면(리서치, 라이팅, 코드, 검증), 각 에이전트가 한 가지를 잘 수행합니다.

병렬 처리. 일부 작업은 독립적인 하위 작업으로 분해됩니다. 20개의 문서를 순차적으로 분석하는 것은 느리지만, 병렬 에이전트로 동시에 분석하는 것은 빠릅니다.

검증. 하나의 에이전트가 출력을 생성하고 두 번째 에이전트가 독립적으로 비판하면 자기 검토가 놓치는 오류를 잡습니다. 검토자는 원래 답변을 옹호하는 데 이해관계가 없습니다.

패턴 1: 오케스트레이터-워커

하나의 오케스트레이터 에이전트가 계획하고 위임합니다. 워커 에이전트가 특정 작업을 실행하고 결과를 반환합니다. 오케스트레이터가 최종 출력을 조합합니다.

이것이 가장 유연한 패턴입니다. 오케스트레이터는 중간 결과에 따라 계획을 적응시키고, 실패한 작업을 재시도하거나 다른 워커로 에스컬레이션할 수 있습니다.

import anthropic
import json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
def orchestrator(user_request: str) -> str:
# Step 1: plan the work
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system="""You are a planning agent. Given a user request, break it into
2-4 specific subtasks. Return a JSON array of task descriptions only.
Example: ["Research X", "Analyze Y", "Synthesize findings"]""",
messages=[{"role": "user", "content": user_request}]
)
tasks = json.loads(plan_response.content[0].text)
# Step 2: run each task with a specialized worker
results = []
for task in tasks:
result = run_worker(
system_prompt="You are a focused execution agent. Complete the assigned task thoroughly.",
task=task
)
results.append({"task": task, "result": result})
# Step 3: synthesize
synthesis_prompt = f"""Original request: {user_request}
Worker results:
{json.dumps(results, indent=2)}
Synthesize these results into a cohesive final response."""
final = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final.content[0].text

오케스트레이터 패턴은 작업 구조를 미리 알 수 없을 때 가장 잘 작동합니다. 문제를 분석하기 전까지 얼마나 많은 하위 작업이 필요한지 모른다면 오케스트레이터를 사용하세요.

한 가지 함정: 오케스트레이터는 말이 안 되는 하위 작업을 환각할 수 있습니다. 출력 형식(JSON 배열, 번호 매긴 목록)을 제약하고 워커를 실행하기 전에 검증합니다. 폴백 재계획 단계가 있는 JSON 파싱 주위의 try/except 블록이 이것을 우아하게 처리합니다.

패턴 2: 파이프라인

에이전트가 순차적인 체인을 형성합니다. 각 에이전트가 입력을 변환하고 출력을 다음 에이전트에게 전달합니다. 에이전트들은 서로에 대해 알지 못합니다. 입력을 받고 출력을 생성합니다.

이것은 구현하고 추론하기가 가장 단순한 패턴입니다. 잘 정의된 단계를 가진 변환 작업에 잘 작동합니다.

def run_pipeline(input_text: str) -> str:
stages = [
{
"name": "Researcher",
"system": "Extract and organize all key facts from the input. "
"Format as a structured list with sources noted where available.",
},
{
"name": "Writer",
"system": "Transform the research notes into clear, readable prose. "
"Maintain all factual content. Target a technical audience.",
},
{
"name": "Editor",
"system": "Improve clarity and concision. Remove redundancy. "
"Do not change facts. Return only the improved text.",
},
{
"name": "Fact Checker",
"system": "Review for internal consistency. Flag any claims that "
"contradict each other or seem unsupported. "
"If no issues, return 'VERIFIED: ' followed by the original text.",
},
]
current = input_text
for stage in stages:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
print(f"[{stage['name']}] complete ({len(current)} chars)")
return current

파이프라인은 오류를 축적합니다. 연구자가 무언가를 놓치면 작가가 추가할 수 없습니다. 단계를 손실이 아닌 가산적으로 설계하세요. 다음 단계가 필요할 수 있는 정보를 삭제하는 단계를 피하세요.

실용적인 조정: 다운스트림 에이전트가 이전 단계가 압축했을 수 있는 컨텍스트가 필요할 때 각 단계의 출력과 함께 원본 입력을 전달합니다.

패턴 3: 병렬 팬아웃

대용량 입력을 독립적인 청크로 분할하고, 각각을 별도의 에이전트로 동시에 처리한 다음 결과를 집계합니다.

이것은 단일 컨텍스트 윈도우에 편안하게 들어가는 것보다 더 많은 데이터를 처리하거나 처리 시간이 중요할 때 올바른 패턴입니다.

import asyncio
async def analyze_document(doc: str, index: int) -> dict:
"""Analyze a single document asynchronously."""
system = """Analyze this document and return a JSON object with:
- "sentiment": positive/negative/neutral
- "key_topics": list of 3-5 main topics
- "summary": 2-3 sentence summary
- "flags": list of any concerns (empty list if none)"""
# asyncio.to_thread lets you call synchronous code in a thread pool
result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=system,
messages=[{"role": "user", "content": doc}]
).content[0].text
)
return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict:
# Fan out: analyze all documents concurrently
tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)]
analyses = await asyncio.gather(*tasks)
# Aggregate with a dedicated synthesis agent
synthesis_input = json.dumps({
"document_count": len(documents),
"analyses": analyses
})
aggregate_result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system="Synthesize document analyses into: overall sentiment distribution, "
"top themes across all documents, and notable patterns in flags. "
"Return as JSON.",
messages=[{"role": "user", "content": synthesis_input}]
).content[0].text
)
return {
"individual": analyses,
"aggregate": json.loads(aggregate_result)
}

집계 단계는 대부분의 구현이 모서리를 잘라내는 곳입니다. 결과를 연결하지 마세요. 집계 작업을 이해하는 에이전트에게 전달하세요. 20개의 분석을 문자열로 결합하는 것은 유용하지 않지만 합성된 요약은 유용합니다.

모델 선택에 주목하세요: 속도가 중요하고 작업이 간단한 대용량 문서별 분석에는 claude-haiku-4-5-20251001을 사용하고, 처리량보다 판단력이 더 중요한 합성에는 claude-sonnet-4-6을 사용합니다.

올바른 패턴 선택

상황패턴
분석하기 전까지 작업 구조를 모름오케스트레이터-워커
잘 정의된 변환 단계파이프라인
대용량 입력, 독립적인 청크병렬 팬아웃
독립적인 검증 필요오케스트레이터 또는 리뷰 단계가 있는 파이프라인
대용량 입력에서 지연 시간 최소화병렬 팬아웃

이러한 패턴은 합성됩니다. 실제 시스템은 일부 작업을 파이프라인으로 실행하면서 다른 작업을 병렬로 팬아웃하는 오케스트레이터를 사용할 수 있습니다. 적합한 가장 단순한 패턴으로 시작하고 더 단순한 접근 방식이 실패할 때만 복잡성을 추가합니다.

실용적인 고려 사항

비용. 멀티에이전트 시스템은 API 호출을 곱합니다. 4단계 파이프라인은 합성 오버헤드를 더해 단일 호출의 4배 비용이 들 수 있습니다. 전략적으로 모델을 혼합합니다: 판단력이 중요한 오케스트레이션과 계획에는 Opus를, 대용량 실행 작업에는 Haiku를 사용합니다.

오류 전파. 각 에이전트가 실패를 어떻게 처리할지 미리 결정합니다. 옵션: 오류 전파(중지), 오류 객체 반환(오케스트레이터가 결정하게 함), 또는 수정된 프롬프트로 재시도(우아하게 복구하지만 지연 추가). 대부분의 프로덕션 시스템에서는 구조화된 오류 객체를 반환하고 오케스트레이터가 결정하게 하는 것이 올바른 기본값입니다.

트레이싱. 각 에이전트가 무엇을 했는지 볼 수 없는 멀티에이전트 시스템은 디버깅의 악몽입니다. 각 에이전트 호출을 다음과 함께 기록합니다: 입력, 출력, 모델, 지연 시간, 토큰 수. 완전한 실행 경로를 재구성할 수 있도록 트레이스 ID로 각 호출에 태그를 붙입니다.

컨텍스트 전달. 각 에이전트가 받는 컨텍스트에 대해 의도적으로 생각합니다. 각 에이전트에게 전체 대화 기록을 전달하는 것은 비용이 많이 들고 종종 혼란스럽습니다. 에이전트들은 관련 없는 이전 컨텍스트로 인해 산만해집니다. 각 에이전트에게 특정 작업을 수행하는 데 필요한 것만 전달합니다.

다음에 구축할 것

여기의 패턴들이 기초입니다. 그 위에 무엇을 구축하느냐는 문제에 달려 있습니다:

  • 워커에 도구 사용을 추가합니다 — 전문 에이전트가 API를 호출하고, 데이터베이스를 쿼리하거나, 코드를 실행할 수 있게 합니다
  • 오케스트레이터가 고위험 행동 전에 일시 정지하는 휴먼 인 더 루프 체크포인트를 추가합니다
  • 미래의 에이전트가 쿼리할 수 있는 벡터 스토어에 에이전트 출력을 지속화하여 메모리를 추가합니다
  • 출력을 사용자에게 반환하기 전에 판사 에이전트를 통해 라우팅하여 평가를 추가합니다

멀티에이전트 시스템은 현재 AI에서 흥미로운 엔지니어링이 일어나는 곳입니다. 패턴은 단순합니다. 판단력은 특정 문제에 올바르게 적용하는 데 있습니다.


관련 기사