SISTEMA DE ACESSO COMPUTACIONAL

Padrões Multi-Agente: Orquestradores, Workers e Pipelines


Agentes individuais são bons em tarefas bem delimitadas. No momento em que uma tarefa requer conhecimento especializado em diferentes domínios, trabalho paralelo sobre entradas grandes, ou decisões que devem ser validadas antes da execução, você precisa de múltiplos agentes.

Sistemas multi-agente não são intrinsecamente mais complexos — são apenas estruturados de forma diferente. A chave é escolher o padrão certo para o problema. Três padrões cobrem a maioria dos casos de uso: orquestrador-worker, pipeline e fan-out paralelo.

Por Que Múltiplos Agentes

O caso para sistemas multi-agente se resume a três razões práticas.

Especialização. Um agente único com 50 ferramentas no contexto fica confuso. Um agente especializado com 5 ferramentas focadas performa melhor. Divida por domínio — pesquisa, escrita, código, verificação — e cada agente faz uma coisa bem.

Paralelismo. Algumas tarefas se decompõem em subtarefas independentes. Analisar 20 documentos sequencialmente é lento; analisá-los de forma concorrente com agentes paralelos é rápido.

Verificação. Ter um agente que produz a saída e um segundo agente que a critica de forma independente detecta erros que a auto-revisão perde. O revisor não tem interesse em defender a resposta original.

Padrão 1: Orquestrador-Worker

Um agente orquestrador planeja e delega. Os agentes workers executam tarefas específicas e retornam resultados. O orquestrador monta a saída final.

Este é o padrão mais flexível. O orquestrador pode adaptar seu plano com base nos resultados intermediários, retentar tarefas falhadas, ou escalar para um worker diferente.

import anthropic
import json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
def orchestrator(user_request: str) -> str:
# Passo 1: planejar o trabalho
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system="""Você é um agente de planejamento. Dado um pedido do usuário, divida-o em
2-4 subtarefas específicas. Retorne apenas um array JSON de descrições de tarefas.
Exemplo: ["Pesquisar X", "Analisar Y", "Sintetizar descobertas"]""",
messages=[{"role": "user", "content": user_request}]
)
tasks = json.loads(plan_response.content[0].text)
# Passo 2: executar cada tarefa com um worker especializado
results = []
for task in tasks:
result = run_worker(
system_prompt="Você é um agente de execução focado. Complete a tarefa atribuída de forma completa.",
task=task
)
results.append({"task": task, "result": result})
# Passo 3: sintetizar
synthesis_prompt = f"""Solicitação original: {user_request}
Resultados dos workers:
{json.dumps(results, indent=2)}
Sintetize esses resultados em uma resposta final coesa."""
final = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final.content[0].text

O padrão orquestrador funciona melhor quando a estrutura da tarefa não é conhecida antecipadamente. Se você não sabe quantas subtarefas precisará até analisar o problema, use um orquestrador.

Um erro comum: orquestradores podem alucinar subtarefas sem sentido. Restrinja o formato de saída (array JSON, lista numerada) e valide-o antes de executar os workers. Um try/except em torno do parsing JSON com uma etapa de replanejamento como fallback lida com isso elegantemente.

Padrão 2: Pipeline

Os agentes formam uma cadeia sequencial. Cada agente transforma a entrada e passa sua saída para o próximo. Nenhum agente conhece os outros — eles recebem entrada e produzem saída.

Este é o padrão mais simples de implementar e raciocinar. Funciona bem para tarefas de transformação com etapas bem definidas.

def run_pipeline(input_text: str) -> str:
stages = [
{
"name": "Pesquisador",
"system": "Extraia e organize todos os fatos-chave da entrada. "
"Formate como uma lista estruturada com fontes anotadas quando disponíveis.",
},
{
"name": "Escritor",
"system": "Transforme as notas de pesquisa em prosa clara e legível. "
"Mantenha todo o conteúdo factual. Dirija-se a um público técnico.",
},
{
"name": "Editor",
"system": "Melhore a clareza e concisão. Remova redundâncias. "
"Não altere os fatos. Retorne apenas o texto melhorado.",
},
{
"name": "Verificador de Fatos",
"system": "Revise a consistência interna. Sinalize quaisquer afirmações que "
"se contradigam ou pareçam sem suporte. "
"Se não houver problemas, retorne 'VERIFICADO: ' seguido do texto original.",
},
]
current = input_text
for stage in stages:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
print(f"[{stage['name']}] completo ({len(current)} chars)")
return current

Pipelines acumulam erros. Se o pesquisador perder algo, o escritor não pode adicioná-lo de volta. Projete suas etapas para serem aditivas em vez de com perda — evite etapas que removam informações que a próxima etapa pode precisar.

Um ajuste prático: passe a entrada original junto com a saída de cada etapa quando agentes posteriores precisarem de contexto que etapas anteriores possam ter comprimido.

Padrão 3: Fan-Out Paralelo

Divida uma entrada grande em fragmentos independentes, processe cada um de forma concorrente com agentes separados e depois agregue os resultados.

Este é o padrão certo quando você está processando mais dados do que cabe confortavelmente em uma janela de contexto, ou quando o tempo de processamento importa.

import asyncio
async def analyze_document(doc: str, index: int) -> dict:
"""Analisa um único documento de forma assíncrona."""
system = """Analise este documento e retorne um objeto JSON com:
- "sentiment": positive/negative/neutral
- "key_topics": lista de 3-5 tópicos principais
- "summary": resumo em 2-3 frases
- "flags": lista de preocupações (lista vazia se nenhuma)"""
result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=system,
messages=[{"role": "user", "content": doc}]
).content[0].text
)
return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict:
# Fan out: analisar todos os documentos de forma concorrente
tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)]
analyses = await asyncio.gather(*tasks)
# Agregar com um agente de síntese dedicado
synthesis_input = json.dumps({
"document_count": len(documents),
"analyses": analyses
})
aggregate_result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system="Sintetize as análises de documentos em: distribuição geral de sentimento, "
"temas principais em todos os documentos e padrões notáveis nos sinais. "
"Retorne como JSON.",
messages=[{"role": "user", "content": synthesis_input}]
).content[0].text
)
return {
"individual": analyses,
"aggregate": json.loads(aggregate_result)
}

A etapa de agregação é onde a maioria das implementações corta caminho. Não concatene resultados — passe-os para um agente que entenda a tarefa de agregação. Uma junção de strings de 20 análises não é útil; um resumo sintetizado é.

Escolhendo o Padrão Certo

SituaçãoPadrão
Estrutura de tarefa desconhecida até analisarOrquestrador-worker
Etapas de transformação bem definidasPipeline
Entrada grande, fragmentos independentesFan-out paralelo
Verificação independente necessáriaOrquestrador ou pipeline com etapa de revisão
Minimizar latência em entradas grandesFan-out paralelo

Esses padrões se compõem. Um sistema real pode usar um orquestrador que distribui algumas tarefas em paralelo enquanto executa outras através de um pipeline. Comece com o padrão mais simples que se encaixa e adicione complexidade apenas quando a abordagem mais simples falhar.

Considerações Práticas

Custo. Sistemas multi-agente multiplicam as chamadas de API. Um pipeline de 4 etapas pode custar 4× uma única chamada mais o overhead de síntese. Misture modelos estrategicamente: use Opus para orquestração e planejamento onde o julgamento importa, Haiku para tarefas de execução de alto volume.

Propagação de erros. Decida antecipadamente como cada agente lida com falhas. Opções: propagar o erro (parar), retornar um objeto de erro (deixar o orquestrador decidir), ou retentar com um prompt modificado (recupera-se elegantemente, adiciona latência). Para a maioria dos sistemas em produção, retornar objetos de erro estruturados e deixar o orquestrador decidir é o padrão correto.

Rastreabilidade. Um sistema multi-agente onde você não consegue ver o que cada agente fez é um pesadelo para depurar. Registre cada chamada de agente com: entrada, saída, modelo, latência e contagem de tokens. Etiquete cada chamada com um ID de rastreamento para poder reconstruir o caminho de execução completo.

Passagem de contexto. Seja deliberado sobre qual contexto cada agente recebe. Passar o histórico completo de conversa para cada agente é caro e frequentemente confuso — agentes se distraem com contexto anterior irrelevante. Passe apenas o que cada agente precisa para fazer seu trabalho específico.

O Que Construir a Seguir

Os padrões aqui são a fundação. O que você constrói sobre eles depende do seu problema:

  • Adicione uso de ferramentas aos workers — deixe agentes especializados chamar APIs, consultar bancos de dados, ou executar código
  • Adicione pontos de controle humano-no-loop onde o orquestrador pausa antes de ações de alto risco
  • Adicione memória persistindo saídas de agentes em um vector store que futuros agentes possam consultar
  • Adicione avaliação roteando saídas através de um agente juiz antes de retorná-las ao usuário

Sistemas multi-agente são onde a engenharia de IA mais interessante acontece agora. Os padrões são simples; o julgamento está em aplicá-los corretamente ao seu problema específico.


Artigos Relacionados