COMPUTER ACCESS SYSTEM

Multi-Agent Patterns: Orchestrators, Workers, and Pipelines


Single agents are good at well-scoped tasks. The moment a task requires specialized knowledge across domains, parallel work over large inputs, or decisions that should be validated before execution, you need multiple agents.

Multi-agent systems aren’t inherently more complex — they’re just structured differently. The key is picking the right pattern for the problem. Three patterns cover most use cases: orchestrator-worker, pipeline, and parallel fan-out.

Why Multiple Agents

The case for multi-agent systems comes down to three practical reasons.

Specialization. A single agent with a 50-tool context gets confused. A specialized agent with 5 focused tools performs better. Split by domain — research, writing, code, verification — and each agent does one thing well.

Parallelism. Some tasks decompose into independent subtasks. Analyzing 20 documents sequentially is slow; analyzing them concurrently with parallel agents is fast.

Verification. Having one agent produce output and a second agent independently critique it catches errors that self-review misses. The reviewer has no stake in defending the original answer.

Pattern 1: Orchestrator-Worker

One orchestrator agent plans and delegates. Worker agents execute specific tasks and return results. The orchestrator assembles the final output.

This is the most flexible pattern. The orchestrator can adapt its plan based on intermediate results, retry failed tasks, or escalate to a different worker.

import anthropic
import json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
def orchestrator(user_request: str) -> str:
# Step 1: plan the work
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system="""You are a planning agent. Given a user request, break it into
2-4 specific subtasks. Return a JSON array of task descriptions only.
Example: ["Research X", "Analyze Y", "Synthesize findings"]""",
messages=[{"role": "user", "content": user_request}]
)
tasks = json.loads(plan_response.content[0].text)
# Step 2: run each task with a specialized worker
results = []
for task in tasks:
result = run_worker(
system_prompt="You are a focused execution agent. Complete the assigned task thoroughly.",
task=task
)
results.append({"task": task, "result": result})
# Step 3: synthesize
synthesis_prompt = f"""Original request: {user_request}
Worker results:
{json.dumps(results, indent=2)}
Synthesize these results into a cohesive final response."""
final = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final.content[0].text

The orchestrator pattern works best when the task structure isn’t known upfront. If you don’t know how many subtasks you’ll need until you’ve analyzed the problem, use an orchestrator.

One gotcha: orchestrators can hallucinate subtasks that don’t make sense. Constrain the output format (JSON array, numbered list) and validate it before running workers. A try/except around the JSON parse with a fallback replanning step handles this gracefully.

Pattern 2: Pipeline

Agents form a sequential chain. Each agent transforms the input and passes its output to the next. No agent knows about the others — they receive input and produce output.

This is the simplest pattern to implement and reason about. It works well for transformation tasks with well-defined stages.

def run_pipeline(input_text: str) -> str:
stages = [
{
"name": "Researcher",
"system": "Extract and organize all key facts from the input. "
"Format as a structured list with sources noted where available.",
},
{
"name": "Writer",
"system": "Transform the research notes into clear, readable prose. "
"Maintain all factual content. Target a technical audience.",
},
{
"name": "Editor",
"system": "Improve clarity and concision. Remove redundancy. "
"Do not change facts. Return only the improved text.",
},
{
"name": "Fact Checker",
"system": "Review for internal consistency. Flag any claims that "
"contradict each other or seem unsupported. "
"If no issues, return 'VERIFIED: ' followed by the original text.",
},
]
current = input_text
for stage in stages:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
print(f"[{stage['name']}] complete ({len(current)} chars)")
return current

Pipelines accumulate errors. If the researcher misses something, the writer can’t add it back. Design your stages to be additive rather than lossy — avoid stages that strip information the next stage might need.

A practical adjustment: pass the original input alongside each stage’s output when downstream agents need context that earlier stages may have compressed away.

Pattern 3: Parallel Fan-Out

Split a large input into independent chunks, process each concurrently with separate agents, then aggregate results.

This is the right pattern when you’re processing more data than fits comfortably in one context window, or when processing time matters.

import asyncio
async def analyze_document(doc: str, index: int) -> dict:
"""Analyze a single document asynchronously."""
system = """Analyze this document and return a JSON object with:
- "sentiment": positive/negative/neutral
- "key_topics": list of 3-5 main topics
- "summary": 2-3 sentence summary
- "flags": list of any concerns (empty list if none)"""
# asyncio.to_thread lets you call synchronous code in a thread pool
result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=system,
messages=[{"role": "user", "content": doc}]
).content[0].text
)
return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict:
# Fan out: analyze all documents concurrently
tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)]
analyses = await asyncio.gather(*tasks)
# Aggregate with a dedicated synthesis agent
synthesis_input = json.dumps({
"document_count": len(documents),
"analyses": analyses
})
aggregate_result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system="Synthesize document analyses into: overall sentiment distribution, "
"top themes across all documents, and notable patterns in flags. "
"Return as JSON.",
messages=[{"role": "user", "content": synthesis_input}]
).content[0].text
)
return {
"individual": analyses,
"aggregate": json.loads(aggregate_result)
}

The aggregation step is where most implementations cut corners. Don’t concatenate results — pass them to an agent that understands the aggregation task. A string join of 20 analyses isn’t useful; a synthesized summary is.

Note the model choice: use claude-haiku-4-5-20251001 for the high-volume per-document analysis where speed matters and the task is straightforward, and claude-sonnet-4-6 for the synthesis where judgment matters more than throughput.

Choosing the Right Pattern

SituationPattern
Task structure unknown until analyzedOrchestrator-worker
Well-defined transformation stagesPipeline
Large input, independent chunksParallel fan-out
Need independent verificationOrchestrator or pipeline with a review stage
Minimize latency on large inputsParallel fan-out

These patterns compose. A real system might use an orchestrator that fans out some tasks in parallel while running others through a pipeline. Start with the simplest pattern that fits and add complexity only when the simpler approach fails.

Practical Considerations

Cost. Multi-agent systems multiply API calls. A 4-stage pipeline might cost 4× a single call plus synthesis overhead. Mix models strategically: use Opus for orchestration and planning where judgment matters, Haiku for high-volume execution tasks.

Error propagation. Decide upfront how each agent handles failures. Options: propagate the error (stop), return an error object (let the orchestrator decide), or retry with a modified prompt (recovers gracefully, adds latency). For most production systems, returning structured error objects and letting the orchestrator decide is the right default.

Tracing. A multi-agent system where you can’t see what each agent did is a debugging nightmare. Log every agent call with: input, output, model, latency, and token count. Tag each call with a trace ID so you can reconstruct the full execution path.

Context passing. Be deliberate about what context each agent receives. Passing the full conversation history to every agent is expensive and often confusing — agents get distracted by irrelevant prior context. Pass only what each agent needs to do its specific job.

What to Build Next

The patterns here are the foundation. What you build on them depends on your problem:

  • Add tool use to workers — let specialized agents call APIs, query databases, or run code
  • Add human-in-the-loop checkpoints where the orchestrator pauses before high-stakes actions
  • Add memory by persisting agent outputs to a vector store that future agents can query
  • Add evaluation by routing outputs through a judge agent before returning them to the user

Multi-agent systems are where the interesting engineering in AI happens right now. The patterns are simple; the judgment is in applying them correctly to your specific problem.