Multi-Agent Patterns: Orchestrators, Workers, and Pipelines
Single agents are good at well-scoped tasks. The moment a task requires specialized knowledge across domains, parallel work over large inputs, or decisions that should be validated before execution, you need multiple agents.
Multi-agent systems aren’t inherently more complex — they’re just structured differently. The key is picking the right pattern for the problem. Three patterns cover most use cases: orchestrator-worker, pipeline, and parallel fan-out.
Why Multiple Agents
The case for multi-agent systems comes down to three practical reasons.
Specialization. A single agent with a 50-tool context gets confused. A specialized agent with 5 focused tools performs better. Split by domain — research, writing, code, verification — and each agent does one thing well.
Parallelism. Some tasks decompose into independent subtasks. Analyzing 20 documents sequentially is slow; analyzing them concurrently with parallel agents is fast.
Verification. Having one agent produce output and a second agent independently critique it catches errors that self-review misses. The reviewer has no stake in defending the original answer.
Pattern 1: Orchestrator-Worker
One orchestrator agent plans and delegates. Worker agents execute specific tasks and return results. The orchestrator assembles the final output.
This is the most flexible pattern. The orchestrator can adapt its plan based on intermediate results, retry failed tasks, or escalate to a different worker.
import anthropicimport json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, system=system_prompt, messages=[{"role": "user", "content": task}] ) return response.content[0].text
def orchestrator(user_request: str) -> str: # Step 1: plan the work plan_response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, system="""You are a planning agent. Given a user request, break it into2-4 specific subtasks. Return a JSON array of task descriptions only.Example: ["Research X", "Analyze Y", "Synthesize findings"]""", messages=[{"role": "user", "content": user_request}] )
tasks = json.loads(plan_response.content[0].text)
# Step 2: run each task with a specialized worker results = [] for task in tasks: result = run_worker( system_prompt="You are a focused execution agent. Complete the assigned task thoroughly.", task=task ) results.append({"task": task, "result": result})
# Step 3: synthesize synthesis_prompt = f"""Original request: {user_request}
Worker results:{json.dumps(results, indent=2)}
Synthesize these results into a cohesive final response."""
final = client.messages.create( model="claude-opus-4-6", max_tokens=2048, messages=[{"role": "user", "content": synthesis_prompt}] ) return final.content[0].textThe orchestrator pattern works best when the task structure isn’t known upfront. If you don’t know how many subtasks you’ll need until you’ve analyzed the problem, use an orchestrator.
One gotcha: orchestrators can hallucinate subtasks that don’t make sense. Constrain the output format (JSON array, numbered list) and validate it before running workers. A try/except around the JSON parse with a fallback replanning step handles this gracefully.
Pattern 2: Pipeline
Agents form a sequential chain. Each agent transforms the input and passes its output to the next. No agent knows about the others — they receive input and produce output.
This is the simplest pattern to implement and reason about. It works well for transformation tasks with well-defined stages.
def run_pipeline(input_text: str) -> str: stages = [ { "name": "Researcher", "system": "Extract and organize all key facts from the input. " "Format as a structured list with sources noted where available.", }, { "name": "Writer", "system": "Transform the research notes into clear, readable prose. " "Maintain all factual content. Target a technical audience.", }, { "name": "Editor", "system": "Improve clarity and concision. Remove redundancy. " "Do not change facts. Return only the improved text.", }, { "name": "Fact Checker", "system": "Review for internal consistency. Flag any claims that " "contradict each other or seem unsupported. " "If no issues, return 'VERIFIED: ' followed by the original text.", }, ]
current = input_text for stage in stages: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, system=stage["system"], messages=[{"role": "user", "content": current}] ) current = response.content[0].text print(f"[{stage['name']}] complete ({len(current)} chars)")
return currentPipelines accumulate errors. If the researcher misses something, the writer can’t add it back. Design your stages to be additive rather than lossy — avoid stages that strip information the next stage might need.
A practical adjustment: pass the original input alongside each stage’s output when downstream agents need context that earlier stages may have compressed away.
Pattern 3: Parallel Fan-Out
Split a large input into independent chunks, process each concurrently with separate agents, then aggregate results.
This is the right pattern when you’re processing more data than fits comfortably in one context window, or when processing time matters.
import asyncio
async def analyze_document(doc: str, index: int) -> dict: """Analyze a single document asynchronously.""" system = """Analyze this document and return a JSON object with:- "sentiment": positive/negative/neutral- "key_topics": list of 3-5 main topics- "summary": 2-3 sentence summary- "flags": list of any concerns (empty list if none)"""
# asyncio.to_thread lets you call synchronous code in a thread pool result = await asyncio.to_thread( lambda: client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, system=system, messages=[{"role": "user", "content": doc}] ).content[0].text ) return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict: # Fan out: analyze all documents concurrently tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)] analyses = await asyncio.gather(*tasks)
# Aggregate with a dedicated synthesis agent synthesis_input = json.dumps({ "document_count": len(documents), "analyses": analyses })
aggregate_result = await asyncio.to_thread( lambda: client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, system="Synthesize document analyses into: overall sentiment distribution, " "top themes across all documents, and notable patterns in flags. " "Return as JSON.", messages=[{"role": "user", "content": synthesis_input}] ).content[0].text )
return { "individual": analyses, "aggregate": json.loads(aggregate_result) }The aggregation step is where most implementations cut corners. Don’t concatenate results — pass them to an agent that understands the aggregation task. A string join of 20 analyses isn’t useful; a synthesized summary is.
Note the model choice: use claude-haiku-4-5-20251001 for the high-volume per-document analysis where speed matters and the task is straightforward, and claude-sonnet-4-6 for the synthesis where judgment matters more than throughput.
Choosing the Right Pattern
| Situation | Pattern |
|---|---|
| Task structure unknown until analyzed | Orchestrator-worker |
| Well-defined transformation stages | Pipeline |
| Large input, independent chunks | Parallel fan-out |
| Need independent verification | Orchestrator or pipeline with a review stage |
| Minimize latency on large inputs | Parallel fan-out |
These patterns compose. A real system might use an orchestrator that fans out some tasks in parallel while running others through a pipeline. Start with the simplest pattern that fits and add complexity only when the simpler approach fails.
Practical Considerations
Cost. Multi-agent systems multiply API calls. A 4-stage pipeline might cost 4× a single call plus synthesis overhead. Mix models strategically: use Opus for orchestration and planning where judgment matters, Haiku for high-volume execution tasks.
Error propagation. Decide upfront how each agent handles failures. Options: propagate the error (stop), return an error object (let the orchestrator decide), or retry with a modified prompt (recovers gracefully, adds latency). For most production systems, returning structured error objects and letting the orchestrator decide is the right default.
Tracing. A multi-agent system where you can’t see what each agent did is a debugging nightmare. Log every agent call with: input, output, model, latency, and token count. Tag each call with a trace ID so you can reconstruct the full execution path.
Context passing. Be deliberate about what context each agent receives. Passing the full conversation history to every agent is expensive and often confusing — agents get distracted by irrelevant prior context. Pass only what each agent needs to do its specific job.
What to Build Next
The patterns here are the foundation. What you build on them depends on your problem:
- Add tool use to workers — let specialized agents call APIs, query databases, or run code
- Add human-in-the-loop checkpoints where the orchestrator pauses before high-stakes actions
- Add memory by persisting agent outputs to a vector store that future agents can query
- Add evaluation by routing outputs through a judge agent before returning them to the user
Multi-agent systems are where the interesting engineering in AI happens right now. The patterns are simple; the judgment is in applying them correctly to your specific problem.
Related Articles
- Tool Use Patterns: Building Reliable Agent-Tool Interfaces
- Agent Error Recovery: 5 Patterns for Production Reliability
- Agent Memory Systems: Giving Your AI Persistent Context
- Debugging and Observability in Autonomous Agent Systems