SYSTÈME D'ACCÈS INFORMATIQUE

Patterns Multi-Agents : Orchestrateurs, Workers et Pipelines


Les agents individuels sont efficaces pour les tâches bien délimitées. Dès qu’une tâche nécessite des connaissances spécialisées dans différents domaines, un travail parallèle sur de grandes entrées, ou des décisions à valider avant exécution, il vous faut plusieurs agents.

Les systèmes multi-agents ne sont pas intrinsèquement plus complexes — ils sont simplement structurés différemment. La clé est de choisir le bon pattern pour le problème. Trois patterns couvrent la plupart des cas d’usage : orchestrateur-worker, pipeline et fan-out parallèle.

Pourquoi Plusieurs Agents

Le cas pour les systèmes multi-agents se résume à trois raisons pratiques.

Spécialisation. Un agent unique avec 50 outils dans son contexte se perd. Un agent spécialisé avec 5 outils ciblés performe mieux. Divisez par domaine — recherche, rédaction, code, vérification — et chaque agent fait une seule chose bien.

Parallélisme. Certaines tâches se décomposent en sous-tâches indépendantes. Analyser 20 documents séquentiellement est lent ; les analyser simultanément avec des agents parallèles est rapide.

Vérification. Avoir un agent qui produit la sortie et un second qui la critique de manière indépendante détecte les erreurs que l’auto-révision manque. Le relecteur n’a aucun intérêt à défendre la réponse originale.

Pattern 1 : Orchestrateur-Worker

Un agent orchestrateur planifie et délègue. Les agents workers exécutent des tâches spécifiques et retournent des résultats. L’orchestrateur assemble la sortie finale.

C’est le pattern le plus flexible. L’orchestrateur peut adapter son plan en fonction des résultats intermédiaires, réessayer les tâches échouées, ou escalader vers un worker différent.

import anthropic
import json
client = anthropic.Anthropic()
def run_worker(system_prompt: str, task: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
def orchestrator(user_request: str) -> str:
# Étape 1 : planifier le travail
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system="""Vous êtes un agent de planification. Étant donné une demande utilisateur, décomposez-la en
2-4 sous-tâches spécifiques. Retournez uniquement un tableau JSON de descriptions de tâches.
Exemple : ["Rechercher X", "Analyser Y", "Synthétiser les résultats"]""",
messages=[{"role": "user", "content": user_request}]
)
tasks = json.loads(plan_response.content[0].text)
# Étape 2 : exécuter chaque tâche avec un worker spécialisé
results = []
for task in tasks:
result = run_worker(
system_prompt="Vous êtes un agent d'exécution ciblé. Accomplissez la tâche assignée de manière approfondie.",
task=task
)
results.append({"task": task, "result": result})
# Étape 3 : synthétiser
synthesis_prompt = f"""Demande originale : {user_request}
Résultats des workers :
{json.dumps(results, indent=2)}
Synthétisez ces résultats en une réponse finale cohérente."""
final = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final.content[0].text

Le pattern orchestrateur fonctionne mieux quand la structure de la tâche n’est pas connue à l’avance. Si vous ne savez pas combien de sous-tâches vous aurez besoin avant d’avoir analysé le problème, utilisez un orchestrateur.

Un écueil courant : les orchestrateurs peuvent halluciner des sous-tâches qui n’ont pas de sens. Contraignez le format de sortie (tableau JSON, liste numérotée) et validez-le avant d’exécuter les workers. Un try/except autour du parsing JSON avec une étape de replanification en fallback gère cela élégamment.

Pattern 2 : Pipeline

Les agents forment une chaîne séquentielle. Chaque agent transforme l’entrée et passe sa sortie au suivant. Aucun agent ne connaît les autres — ils reçoivent une entrée et produisent une sortie.

C’est le pattern le plus simple à implémenter et à raisonner. Il fonctionne bien pour les tâches de transformation avec des étapes bien définies.

def run_pipeline(input_text: str) -> str:
stages = [
{
"name": "Chercheur",
"system": "Extrayez et organisez tous les faits clés de l'entrée. "
"Formatez sous forme de liste structurée avec les sources notées si disponibles.",
},
{
"name": "Rédacteur",
"system": "Transformez les notes de recherche en prose claire et lisible. "
"Maintenez tout le contenu factuel. Ciblez un public technique.",
},
{
"name": "Éditeur",
"system": "Améliorez la clarté et la concision. Supprimez les redondances. "
"Ne modifiez pas les faits. Retournez uniquement le texte amélioré.",
},
{
"name": "Vérificateur de Faits",
"system": "Vérifiez la cohérence interne. Signalez toute affirmation qui "
"se contredit ou semble non étayée. "
"Si aucun problème, retournez 'VÉRIFIÉ : ' suivi du texte original.",
},
]
current = input_text
for stage in stages:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
print(f"[{stage['name']}] terminé ({len(current)} chars)")
return current

Les pipelines accumulent les erreurs. Si le chercheur manque quelque chose, le rédacteur ne peut pas le rajouter. Concevez vos étapes pour qu’elles soient additives plutôt que destructives — évitez les étapes qui suppriment des informations dont l’étape suivante pourrait avoir besoin.

Un ajustement pratique : passez l’entrée originale avec la sortie de chaque étape quand les agents en aval ont besoin d’un contexte que les étapes précédentes ont pu compresser.

Pattern 3 : Fan-Out Parallèle

Divisez une grande entrée en fragments indépendants, traitez chacun simultanément avec des agents séparés, puis agrégez les résultats.

C’est le bon pattern quand vous traitez plus de données que ce qui tient confortablement dans une fenêtre de contexte, ou quand le temps de traitement compte.

import asyncio
async def analyze_document(doc: str, index: int) -> dict:
"""Analyse un seul document de manière asynchrone."""
system = """Analysez ce document et retournez un objet JSON avec :
- "sentiment": positive/negative/neutral
- "key_topics": liste de 3-5 sujets principaux
- "summary": résumé en 2-3 phrases
- "flags": liste des préoccupations (liste vide si aucune)"""
result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=system,
messages=[{"role": "user", "content": doc}]
).content[0].text
)
return {"index": index, **json.loads(result)}
async def parallel_analysis(documents: list[str]) -> dict:
# Fan out : analyser tous les documents simultanément
tasks = [analyze_document(doc, i) for i, doc in enumerate(documents)]
analyses = await asyncio.gather(*tasks)
# Agréger avec un agent de synthèse dédié
synthesis_input = json.dumps({
"document_count": len(documents),
"analyses": analyses
})
aggregate_result = await asyncio.to_thread(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system="Synthétisez les analyses de documents en : distribution globale du sentiment, "
"thèmes principaux dans tous les documents et patterns notables dans les signaux. "
"Retournez en JSON.",
messages=[{"role": "user", "content": synthesis_input}]
).content[0].text
)
return {
"individual": analyses,
"aggregate": json.loads(aggregate_result)
}

L’étape d’agrégation est là où la plupart des implémentations font des raccourcis. Ne concaténez pas les résultats — passez-les à un agent qui comprend la tâche d’agrégation. Une jointure de chaînes de 20 analyses n’est pas utile ; un résumé synthétisé l’est.

Choisir le Bon Pattern

SituationPattern
Structure de tâche inconnue avant analyseOrchestrateur-worker
Étapes de transformation bien définiesPipeline
Grande entrée, fragments indépendantsFan-out parallèle
Vérification indépendante nécessaireOrchestrateur ou pipeline avec étape de révision
Minimiser la latence sur de grandes entréesFan-out parallèle

Ces patterns se composent. Un système réel peut utiliser un orchestrateur qui distribue certaines tâches en parallèle tout en en exécutant d’autres via un pipeline. Commencez par le pattern le plus simple qui convient et ajoutez de la complexité seulement quand l’approche plus simple échoue.

Considérations Pratiques

Coût. Les systèmes multi-agents multiplient les appels API. Un pipeline à 4 étapes peut coûter 4× un seul appel plus l’overhead de synthèse. Mélangez les modèles stratégiquement : utilisez Opus pour l’orchestration et la planification où le jugement compte, Haiku pour les tâches d’exécution à haut volume.

Propagation des erreurs. Décidez à l’avance comment chaque agent gère les échecs. Options : propager l’erreur (arrêter), retourner un objet d’erreur (laisser l’orchestrateur décider), ou réessayer avec un prompt modifié (récupère élégamment, ajoute de la latence). Pour la plupart des systèmes en production, retourner des objets d’erreur structurés et laisser l’orchestrateur décider est le bon défaut.

Traçabilité. Un système multi-agents où vous ne pouvez pas voir ce que chaque agent a fait est un cauchemar à déboguer. Loggez chaque appel d’agent avec : entrée, sortie, modèle, latence et nombre de tokens. Étiquetez chaque appel avec un ID de trace pour pouvoir reconstruire le chemin d’exécution complet.

Passage de contexte. Soyez délibéré sur quel contexte chaque agent reçoit. Passer l’historique complet de conversation à chaque agent est coûteux et souvent déroutant — les agents se laissent distraire par un contexte préalable non pertinent. Ne passez que ce dont chaque agent a besoin pour faire son travail spécifique.

Quoi Construire Ensuite

Les patterns ici sont la fondation. Ce que vous construisez dessus dépend de votre problème :

  • Ajoutez l’utilisation d’outils aux workers — laissez les agents spécialisés appeler des APIs, interroger des bases de données, ou exécuter du code
  • Ajoutez des points de contrôle humain-dans-la-boucle où l’orchestrateur s’arrête avant des actions à enjeux élevés
  • Ajoutez de la mémoire en persistant les sorties des agents dans un vector store que les futurs agents peuvent interroger
  • Ajoutez de l’évaluation en routant les sorties à travers un agent juge avant de les retourner à l’utilisateur

Les systèmes multi-agents sont là où se passe l’ingénierie IA la plus intéressante en ce moment. Les patterns sont simples ; le jugement est dans leur application correcte à votre problème spécifique.


Articles Connexes