Streaming des réponses d'agents : sortie en temps réel pour les workflows multi-étapes
Streaming des réponses d’agents : sortie en temps réel pour les workflows multi-étapes
Votre agent met 20 secondes pour rechercher une information, appeler trois outils et synthétiser une réponse. L’utilisateur clique sur « Demander » et fixe un spinner pendant 20 secondes. Il ne sait pas si ça fonctionne. Il envisage de rafraîchir la page. Il se demande s’il doit tout recommencer.
Imaginez maintenant ceci : l’utilisateur clique sur « Demander » et, en moins de 500 millisecondes, il voit les premiers mots de l’agent apparaître. Il le regarde chercher des informations — « 🔍 Recherche dans la base de données des commandes… » — et voit les résultats arriver en temps réel. Il lit la réponse au fur et à mesure qu’elle s’écrit, phrase par phrase. Toujours 20 secondes. Une expérience radicalement différente.
Le streaming n’est pas un luxe pour les agents orientés utilisateur — c’est une exigence d’expérience utilisateur. Le temps avant le premier token est la métrique de latence la plus importante dans les interfaces d’agents. Les utilisateurs qui voient de la progression sont patients. Ceux qui ne voient rien abandonnent. Étude après étude sur les performances web, il est démontré que la latence perçue compte plus que la latence réelle, et le streaming est l’outil le plus puissant dont vous disposez pour réduire l’écart entre les deux.
Dans cet article, vous apprendrez à implémenter le streaming en temps réel pour des agents IA multi-étapes — de la diffusion token par token à la transparence des appels d’outils, aux mises à jour de statut, à la divulgation progressive, à la récupération d’erreurs et aux choix de couche de transport.
Section 1 : Les bases de l’API de streaming Claude
Avant de pouvoir streamer un agent, vous devez streamer une seule réponse Claude. Commençons par les fondamentaux.
Stream vs. Sans stream
Un appel API sans streaming bloque jusqu’à ce que la réponse entière soit générée, puis la retourne d’un seul coup. Un appel en streaming retourne une séquence d’événements au fur et à mesure que la réponse est produite, en commençant par le premier token.
La différence dans le code est minime. La différence dans l’expérience utilisateur est énorme.
Types d’événements
L’API de streaming Claude émet une séquence structurée d’événements server-sent :
message_start— Contient l’objetMessageinitial avec les métadonnées (modèle, rôle, utilisation).content_block_start— Signale le début d’un bloc de contenu (texte ou tool_use).content_block_delta— Contient du contenu incrémental : fragments de texte ou JSON partiel des entrées d’outils.content_block_stop— Signale la fin d’un bloc de contenu.message_delta— Mises à jour finales du message (raison d’arrêt, utilisation finale).message_stop— Le stream est terminé.
Pour les réponses textuelles, vous recevrez de nombreux événements content_block_delta, chacun transportant un petit morceau de texte (souvent quelques tokens).
Implémentation basique du streaming
Voici un exemple de streaming synchrone utilisant le SDK Python d’Anthropic :
import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str): """Stream a basic Claude response token by token.""" with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: for text in stream.text_stream: print(text, end="", flush=True) print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")Et la version asynchrone, que vous utiliserez dans les serveurs web en production :
import anthropicimport asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str): """Async streaming with the Anthropic SDK.""" async with async_client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))Gestion des tokens partiels et des limites UTF-8
Le SDK gère le décodage UTF-8 pour vous, mais si vous travaillez avec le flux HTTP brut, sachez que les caractères multi-octets peuvent être divisés entre plusieurs chunks. Mettez toujours les octets bruts en mémoire tampon et décodez uniquement lorsque vous avez des séquences UTF-8 complètes. L’itérateur text_stream du SDK anthropic gère cela automatiquement — une raison supplémentaire de l’utiliser plutôt que d’analyser vous-même le flux SSE brut.
Section 2 : Streaming des appels d’outils
Le streaming de texte basique est le minimum requis. Le vrai défi — et la vraie valeur — apparaît lorsque votre agent utilise des outils. Un agent multi-étapes qui appelle trois outils en séquence peut sembler inactif pendant l’exécution des outils, sauf si vous montrez ce qui se passe.
Comment les appels d’outils apparaissent dans le stream
Lorsque Claude décide d’utiliser un outil, le stream émet un événement content_block_start avec type: "tool_use", suivi d’événements content_block_delta contenant des fragments du JSON d’entrée de l’outil. Une fois l’appel d’outil complet assemblé, vous exécutez l’outil, injectez le résultat et continuez la conversation.
L’idée clé : vous connaissez le nom de l’outil dès que content_block_start arrive, même avant que le JSON d’entrée soit complet. Cela signifie que vous pouvez immédiatement montrer à l’utilisateur quelque chose comme « 🔍 Recherche dans la base de données des commandes… » sans attendre l’appel d’outil complet.
La boucle complète de l’agent en streaming
Voici une boucle d’agent en streaming complète qui affiche les invocations d’outils en temps réel :
import anthropicimport json
client = anthropic.Anthropic()
# Define toolstools = [ { "name": "search_orders", "description": "Search customer orders by order ID or customer email.", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Order ID or email"}, }, "required": ["query"], }, }, { "name": "get_shipping_status", "description": "Get real-time shipping status for an order.", "input_schema": { "type": "object", "properties": { "order_id": {"type": "string", "description": "The order ID"}, }, "required": ["order_id"], }, },]
def execute_tool(tool_name: str, tool_input: dict) -> str: """Execute a tool and return the result as a string.""" if tool_name == "search_orders": # Simulated database lookup return json.dumps({ "order_id": "ORD-12345", "customer": "jane@example.com", "items": ["Blue Widget x2", "Red Gadget x1"], "total": "$47.99", "status": "shipped", }) elif tool_name == "get_shipping_status": return json.dumps({ "order_id": tool_input["order_id"], "carrier": "FedEx", "tracking": "7891011", "estimated_delivery": "2026-03-10", "current_location": "Memphis, TN", }) return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str): """ Complete streaming agent loop with real-time tool call display. """ messages = [{"role": "user", "content": user_message}]
while True: # Stream the model response tool_calls = [] current_tool = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, tools=tools, messages=messages, ) as stream: response = None for event in stream: # The SDK exposes raw events via iteration pass
# Use the helper to collect text and tool use response = stream.get_final_message()
# Process the response content blocks for block in response.content: if block.type == "text": print(block.text, end="", flush=True) elif block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id
# Show the user what's happening print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool result = execute_tool(tool_name, tool_input) print(f"✅ Result received from {tool_name}")
tool_calls.append({ "tool_use_id": tool_id, "tool_name": tool_name, "tool_input": tool_input, "result": result, })
# If the model stopped because it wants to use tools, continue the loop if response.stop_reason == "tool_use": # Add assistant message with all content blocks messages.append({"role": "assistant", "content": response.content})
# Add tool results tool_results = [] for tc in tool_calls: tool_results.append({ "type": "tool_result", "tool_use_id": tc["tool_use_id"], "content": tc["result"], }) messages.append({"role": "user", "content": tool_results})
print("\n--- Continuing agent loop ---") else: # Model is done (stop_reason == "end_turn") print() break
# Run the agentstream_agent_response("Where is my order #ORD-12345? When will it arrive?")Pour un vrai streaming token par token avec détection des appels d’outils, vous pouvez itérer sur les événements bruts :
def stream_agent_with_live_tokens(user_message: str): """Stream text tokens live while also detecting tool calls.""" messages = [{"role": "user", "content": user_message}]
while True: collected_content = [] current_text = "" current_tool_name = None current_tool_input_json = "" current_tool_id = None stop_reason = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096
---
## Articles Connexes
- [Patterns d'Utilisation des Outils : Interfaces Agent-Outil Fiables](/fr/blog/agent-tool-use-patterns/)- [Patterns Multi-Agents : Orchestrateurs, Workers et Pipelines](/fr/blog/multi-agent-patterns/)- [Récupération d'Erreurs pour Agents : 5 Patrons pour la Fiabilité en Production](/fr/blog/agent-error-recovery-patterns/)- [Agents d'automatisation web : contrôle du navigateur avec Claude et Computer Use](/fr/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)