コンピュータアクセスシステム

マルチエージェントパターン:オーケストレーター、ワーカー、パイプライン


単一エージェントは、スコープが明確なタスクに適しています。タスクが複数のドメインにわたる専門知識を必要とする場合、大量の入力に対する並列作業が必要な場合、または実行前に検証すべき決定が必要な場合、複数のエージェントが必要です。

マルチエージェントシステムは本質的に複雑なわけではありません。ただ異なる方法で構造化されているだけです。鍵は問題に合った正しいパターンを選ぶことです。3つのパターンがほとんどのユースケースをカバーします: オーケストレーター・ワーカーパイプライン並列ファンアウト

なぜ複数のエージェントが必要か

マルチエージェントシステムの根拠は3つの実用的な理由に集約されます。

専門化。 コンテキストに50のツールを持つ単一エージェントは混乱します。5つの集中したツールを持つ専門エージェントの方がパフォーマンスが優れています。ドメインごとに分割すると(リサーチ、ライティング、コード、検証)、各エージェントは一つのことをうまく行います。

並列処理。 タスクの中には独立したサブタスクに分解できるものがあります。20のドキュメントを順次分析するのは遅いですが、並列エージェントで同時に分析するのは速いです。

検証。 一つのエージェントが出力を生成し、二番目のエージェントが独立してそれを批評すると、自己レビューが見逃すエラーを捉えられます。レビュアーは元の回答を擁護する利害関係を持ちません。

パターン1: オーケストレーター・ワーカー

一つのオーケストレーターエージェントが計画し、委任します。ワーカーエージェントが特定のタスクを実行して結果を返します。オーケストレーターが最終出力を組み立てます。

これは最も柔軟なパターンです。オーケストレーターは中間結果に基づいて計画を適応させ、失敗したタスクを再試行し、または別のワーカーにエスカレーションできます。

import anthropic
import json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
def orchestrator(user_request: str) -> str:
# Step 1: plan the work
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system="""You are a planning agent. Given a user request, break it into
2-4 specific subtasks. Return a JSON array of task descriptions only.
Example: ["Research X", "Analyze Y", "Synthesize findings"]""",
messages=[{"role": "user", "content": user_request}]
)
tasks = json.loads(plan_response.content[0].text)
# Step 2: run each task with a specialized worker
results = []
for task in tasks:
result = run_worker(
system_prompt="You are a focused execution agent. Complete the assigned task thoroughly.",
task=task
)
results.append({"task": task, "result": result})
# Step 3: synthesize
synthesis_prompt = f"""Original request: {user_request}
Worker results:
{json.dumps(results, indent=2)}
Synthesize these results into a cohesive final response."""
final = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final.content[0].text

オーケストレーターパターンは、タスクの構造が事前にわからない場合に最もうまく機能します。問題を分析するまでいくつのサブタスクが必要かわからない場合は、オーケストレーターを使用します。

一つの落とし穴:オーケストレーターは意味のないサブタスクを幻覚することがあります。出力形式(JSON配列、番号付きリスト)を制約し、ワーカーを実行する前に検証します。フォールバックの再計画ステップを持つJSON解析の周りのtry/exceptブロックがこれを優雅に処理します。

パターン2: パイプライン

エージェントが逐次チェーンを形成します。各エージェントが入力を変換し、その出力を次のエージェントに渡します。エージェントはお互いについて知りません。入力を受け取り、出力を生成します。

これは実装と推論が最も簡単なパターンです。明確に定義されたステージを持つ変換タスクにうまく機能します。

def run_pipeline(input_text: str) -> str:
stages = [
{
"name": "Researcher",
"system": "Extract and organize all key facts from the input. "
"Format as a structured list with sources noted where available.",
},
{
"name": "Writer",
"system": "Transform the research notes into clear, readable prose. "
"Maintain all factual content. Target a technical audience.",
},
{
"name": "Editor",
"system": "Improve clarity and concision. Remove redundancy. "
"Do not change facts. Return only the improved text.",
},
{
"name": "Fact Checker",
"system": "Review for internal consistency. Flag any claims that "
"contradict each other or seem unsupported. "
"If no issues, return 'VERIFIED: ' followed by the original text.",
},
]
current = input_text
for stage in stages:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
print(f"[{stage['name']}] complete ({len(current)} chars)")
return current

パイプラインはエラーを蓄積します。リサーチャーが何かを見逃すと、ライターはそれを追加できません。ステージをロッシーではなく、加算的になるように設計します。次のステージが必要とする情報を削除するステージは避けてください。

実際的な調整:下流エージェントが以前のステージが圧縮したかもしれないコンテキストを必要とする場合、各ステージの出力とともに元の入力を渡します。

パターン3: 並列ファンアウト

大きな入力を独立したチャンクに分割し、それぞれを別々のエージェントで同時に処理し、結果を集約します。

これは、一つのコンテキストウィンドウに快適に収まる以上のデータを処理しているとき、または処理時間が重要なときに適したパターンです。

import asyncio
async def analyze_document(doc: str, index: int) -> dict:
"""Analyze a single document asynchronously."""
system = """Analyze this document and return a JSON object with:
- "sentiment": positive/negative/neutral
- "key_topics": list of 3-5 main topics
- "summary": 2-3 sentence summary
- "flags": list of any concerns (empty list if none)"""
# asyncio.to_thread lets you call synchronous code in a thread pool
result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=system,
messages=[{"role": "user", "content": doc}]
).content[0].text
)
return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict:
# Fan out: analyze all documents concurrently
tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)]
analyses = await asyncio.gather(*tasks)
# Aggregate with a dedicated synthesis agent
synthesis_input = json.dumps({
"document_count": len(documents),
"analyses": analyses
})
aggregate_result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system="Synthesize document analyses into: overall sentiment distribution, "
"top themes across all documents, and notable patterns in flags. "
"Return as JSON.",
messages=[{"role": "user", "content": synthesis_input}]
).content[0].text
)
return {
"individual": analyses,
"aggregate": json.loads(aggregate_result)
}

集約ステップは、ほとんどの実装がショートカットを切るところです。結果を連結しないでください。集約タスクを理解するエージェントに渡します。20の分析の文字列結合は有用ではありませんが、合成された要約は有用です。

モデルの選択に注意してください。速度が重要でタスクが単純な大量のドキュメント単位の分析にはclaude-haiku-4-5-20251001を使用し、スループットよりも判断力が重要な合成にはclaude-sonnet-4-6を使用します。

適切なパターンの選択

状況パターン
タスク構造が分析するまで不明オーケストレーター・ワーカー
明確に定義された変換ステージパイプライン
大きな入力、独立したチャンク並列ファンアウト
独立した検証が必要オーケストレーターまたはレビューステージ付きパイプライン
大きな入力でのレイテンシを最小化並列ファンアウト

これらのパターンは合成できます。実際のシステムでは、他のタスクをパイプラインで実行しながら、いくつかのタスクを並列にファンアウトするオーケストレーターを使用することがあります。適合する最も単純なパターンから始め、より単純なアプローチが失敗した場合にのみ複雑さを追加します。

実際的な考慮事項

コスト。 マルチエージェントシステムはAPIコールを乗算します。4ステージのパイプラインは、合成オーバーヘッドを加えて単一コールの4倍のコストがかかる場合があります。モデルを戦略的に混在させます。判断力が重要なオーケストレーションと計画にはOpusを使用し、大量の実行タスクにはHaikuを使用します。

エラーの伝播。 各エージェントが失敗をどのように処理するかを事前に決定します。オプション:エラーを伝播する(停止)、エラーオブジェクトを返す(オーケストレーターに決定させる)、または変更されたプロンプトで再試行する(優雅に回復するがレイテンシが追加される)。ほとんどの本番システムでは、構造化されたエラーオブジェクトを返してオーケストレーターに決定させることが正しいデフォルトです。

トレーシング。 各エージェントが何をしたかを見ることができないマルチエージェントシステムはデバッグの悪夢です。各エージェントコールを次の情報とともにログに記録します:入力、出力、モデル、レイテンシ、トークン数。完全な実行パスを再構築できるようにトレースIDで各コールにタグを付けます。

コンテキストの受け渡し。 各エージェントが受け取るコンテキストについて意図的に考えます。完全な会話履歴を各エージェントに渡すことは高コストで、しばしば混乱を招きます。エージェントは無関係な以前のコンテキストによって気が散ります。各エージェントにその特定の仕事をするために必要なものだけを渡してください。

次に構築すること

ここでのパターンは基盤です。その上に何を構築するかはあなたの問題によります:

  • ワーカーにツール使用を追加する — 専門エージェントがAPIを呼び出し、データベースをクエリし、コードを実行できるようにする
  • オーケストレーターがリスクの高いアクションの前に一時停止するヒューマンインザループチェックポイントを追加する
  • 将来のエージェントがクエリできるベクターストアにエージェントの出力を永続化することでメモリを追加する
  • 出力をユーザーに返す前に判定エージェントを通じてルーティングすることで評価を追加する

マルチエージェントシステムは、今AIで興味深いエンジニアリングが行われている場所です。パターンは単純です。判断力はあなたの特定の問題にそれらを正しく適用することにあります。


関連記事