多代理模式:编排器、Worker 与管道
单个代理擅长范围明确的任务。一旦任务需要跨域专业知识、对大量输入进行并行处理,或者需要在执行前进行决策验证,你就需要多个代理了。
多代理系统并不是本质上更复杂——它们只是结构不同。关键在于为问题选择正确的模式。三种模式涵盖了大多数使用场景:编排器-Worker、管道和并行扇出。
为何需要多个代理
多代理系统的必要性归结为三个实际原因。
专业化。 单个代理拥有 50 个工具的上下文时会感到困惑。专注于 5 个工具的专业代理表现更好。按领域划分——研究、写作、代码、验证——让每个代理只做一件事,并做好。
并行性。 有些任务可以分解为独立的子任务。顺序分析 20 个文档很慢;用并行代理同时分析则很快。
验证。 让一个代理生成输出,再由另一个代理独立批评,能发现自我审查遗漏的错误。审核者对原始答案没有任何利益关联。
模式一:编排器-Worker
一个编排器代理负责规划和分派任务。Worker 代理执行具体任务并返回结果。编排器汇总最终输出。
这是最灵活的模式。编排器可以根据中间结果调整计划、重试失败的任务,或将任务升级到不同的 Worker。
import anthropicimport json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, system=system_prompt, messages=[{"role": "user", "content": task}] ) return response.content[0].text
def orchestrator(user_request: str) -> str: # 第一步:规划工作 plan_response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, system="""你是一个规划代理。给定用户请求,将其分解为2-4 个具体子任务。仅返回任务描述的 JSON 数组。示例:["研究 X", "分析 Y", "综合发现"]""", messages=[{"role": "user", "content": user_request}] )
tasks = json.loads(plan_response.content[0].text)
# 第二步:用专业 Worker 执行每个任务 results = [] for task in tasks: result = run_worker( system_prompt="你是一个专注的执行代理。彻底完成分配的任务。", task=task ) results.append({"task": task, "result": result})
# 第三步:综合结果 synthesis_prompt = f"""原始请求:{user_request}
Worker 结果:{json.dumps(results, indent=2, ensure_ascii=False)}
将这些结果综合为一个连贯的最终回答。"""
final = client.messages.create( model="claude-opus-4-6", max_tokens=2048, messages=[{"role": "user", "content": synthesis_prompt}] ) return final.content[0].text编排器模式在任务结构事先未知时效果最佳。如果在分析问题之前不知道需要多少子任务,请使用编排器。
一个常见的陷阱:编排器可能会幻觉出没有意义的子任务。约束输出格式(JSON 数组、编号列表)并在运行 Worker 之前对其进行验证。在 JSON 解析周围加上 try/except,并带有重新规划步骤作为回退,可以优雅地处理这个问题。
模式二:管道
代理形成顺序链。每个代理转换输入并将输出传递给下一个代理。没有代理了解其他代理——它们只是接收输入并产生输出。
这是最简单的实现和推理模式。它适用于具有明确阶段的转换任务。
def run_pipeline(input_text: str) -> str: stages = [ { "name": "研究员", "system": "从输入中提取并组织所有关键事实。" "格式化为结构化列表,在可用时注明来源。", }, { "name": "写作者", "system": "将研究笔记转化为清晰、可读的散文。" "保持所有事实内容。面向技术受众。", }, { "name": "编辑", "system": "提高清晰度和简洁性。删除冗余内容。" "不要改变事实。只返回改进后的文本。", }, { "name": "事实核查员", "system": "审查内部一致性。标记任何相互矛盾或" "似乎缺乏支撑的声明。" "如果没有问题,返回'已验证:'后跟原始文本。", }, ]
current = input_text for stage in stages: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, system=stage["system"], messages=[{"role": "user", "content": current}] ) current = response.content[0].text print(f"[{stage['name']}] 完成({len(current)} 字符)")
return current管道会累积错误。如果研究员遗漏了某些内容,写作者无法将其补回。将阶段设计为加法性的而非有损的——避免去除下一阶段可能需要的信息的阶段。
一个实用的调整:当下游代理需要早期阶段可能压缩掉的上下文时,在每个阶段的输出中传递原始输入。
模式三:并行扇出
将大型输入拆分为独立的块,用独立的代理并发处理每个块,然后聚合结果。
当你处理的数据超过单个上下文窗口的舒适容量,或者处理时间很重要时,这是正确的模式。
import asyncio
async def analyze_document(doc: str, index: int) -> dict: """异步分析单个文档。""" system = """分析此文档并返回包含以下内容的 JSON 对象:- "sentiment":positive/negative/neutral- "key_topics":3-5 个主要话题的列表- "summary":2-3 句话的摘要- "flags":关注点列表(如果没有则为空列表)"""
result = await asyncio.to_thread( lambda: client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, system=system, messages=[{"role": "user", "content": doc}] ).content[0].text ) return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict: # 扇出:并发分析所有文档 tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)] analyses = await asyncio.gather(*tasks)
# 用专用综合代理聚合 synthesis_input = json.dumps({ "document_count": len(documents), "analyses": analyses }, ensure_ascii=False)
aggregate_result = await asyncio.to_thread( lambda: client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, system="将文档分析综合为:总体情感分布、所有文档中的顶级主题," "以及标记中的显著模式。以 JSON 格式返回。", messages=[{"role": "user", "content": synthesis_input}] ).content[0].text )
return { "individual": analyses, "aggregate": json.loads(aggregate_result) }聚合步骤是大多数实现偷工减料的地方。不要拼接结果——将它们传递给理解聚合任务的代理。20 个分析的字符串连接没有用;综合摘要才有用。
选择正确的模式
| 情况 | 模式 |
|---|---|
| 分析之前任务结构未知 | 编排器-Worker |
| 定义明确的转换阶段 | 管道 |
| 大型输入,独立块 | 并行扇出 |
| 需要独立验证 | 带有审核阶段的编排器或管道 |
| 最小化大型输入的延迟 | 并行扇出 |
这些模式可以组合使用。真实系统可能使用一个编排器,将某些任务并行分发,同时通过管道运行其他任务。从最简单的适合模式开始,只有当简单方法失败时才添加复杂性。
实际考量
成本。 多代理系统会成倍增加 API 调用。一个 4 阶段管道的成本可能是单次调用的 4 倍加上综合开销。战略性地混合模型:将 Opus 用于判断很重要的编排和规划,将 Haiku 用于高量执行任务。
错误传播。 预先决定每个代理如何处理失败。选项:传播错误(停止)、返回错误对象(让编排器决定),或使用修改后的提示重试(优雅恢复,增加延迟)。对于大多数生产系统,返回结构化错误对象并让编排器决定是正确的默认行为。
追踪。 无法看到每个代理做了什么的多代理系统是调试噩梦。记录每次代理调用:输入、输出、模型、延迟和 token 数量。用追踪 ID 标记每次调用,以便重建完整的执行路径。
上下文传递。 对每个代理接收的上下文要有意识。将完整对话历史传递给每个代理既昂贵又常常令人困惑——代理会被不相关的先前上下文分散注意力。只传递每个代理完成其特定工作所需的内容。
下一步构建什么
这里的模式是基础。在其上构建什么取决于你的问题:
- 为 Worker 添加工具使用——让专业代理调用 API、查询数据库或运行代码
- 添加人机协作检查点——让编排器在高风险操作之前暂停
- 通过将代理输出持久化到向量存储中添加记忆,以便未来的代理可以查询
- 通过在将输出返回用户之前路由通过评判代理添加评估
多代理系统是目前 AI 工程中最有趣的地方。模式很简单;判断在于将它们正确应用于你的具体问题。