Streaming von Agent-Antworten: Echtzeit-Ausgabe für mehrstufige Workflows
Streaming von Agent-Antworten: Echtzeit-Ausgabe für mehrstufige Workflows
Dein Agent braucht 20 Sekunden, um eine Frage zu recherchieren, drei Tools aufzurufen und eine Antwort zu formulieren. Der Nutzer klickt auf „Fragen” und starrt 20 Sekunden lang auf einen Ladekreisel. Er ist sich nicht sicher, ob es funktioniert. Er überlegt, die Seite neu zu laden. Er fragt sich, ob er von vorne anfangen soll.
Stell dir stattdessen vor: Der Nutzer klickt auf „Fragen” und sieht innerhalb von 500 Millisekunden die ersten Wörter des Agenten erscheinen. Er beobachtet, wie der Agent nach Informationen sucht – „🔍 Durchsuche Bestelldatenbank…” – und sieht Ergebnisse in Echtzeit eintreffen. Er liest die Antwort, während sie Satz für Satz geschrieben wird. Dieselben 20 Sekunden. Eine völlig andere Erfahrung.
Streaming ist kein nettes Zusatzfeature für nutzerorientierte Agenten – es ist eine UX-Anforderung. Die Zeit bis zum ersten Token ist die wichtigste Latenz-Kennzahl in Agent-Interfaces. Nutzer, die Fortschritt sehen, sind geduldig. Nutzer, die nichts sehen, brechen ab. Studie um Studie zur Web-Performance zeigt, dass die wahrgenommene Latenz mehr zählt als die tatsächliche – und Streaming ist das wirkungsvollste Werkzeug, um die Lücke zwischen beiden zu schließen.
In diesem Artikel lernst du, wie du Echtzeit-Streaming für mehrstufige KI-Agenten implementierst – von der Token-für-Token-Ausgabe über Tool-Call-Transparenz, Statusupdates, progressive Offenlegung, Fehlerwiederherstellung bis hin zu Transportschicht-Entscheidungen.
Abschnitt 1: Grundlagen der Claude Streaming API
Bevor du einen Agenten streamen kannst, musst du eine einzelne Claude-Antwort streamen. Fangen wir mit den Grundlagen an.
Stream vs. Non-Stream
Ein nicht-streamender API-Aufruf blockiert, bis die gesamte Antwort generiert ist, und gibt sie dann auf einmal zurück. Ein Streaming-Aufruf gibt eine Abfolge von Events zurück, während die Antwort produziert wird, beginnend mit dem ersten Token.
Der Unterschied im Code ist minimal. Der Unterschied in der Nutzererfahrung ist enorm.
Event-Typen
Die Claude Streaming API sendet eine strukturierte Abfolge von Server-Sent Events:
message_start— Enthält das initialeMessage-Objekt mit Metadaten (Modell, Rolle, Nutzung).content_block_start— Signalisiert den Beginn eines Inhaltsblocks (Text oder tool_use).content_block_delta— Enthält inkrementellen Inhalt: Textfragmente oder partielles Tool-Input-JSON.content_block_stop— Signalisiert das Ende eines Inhaltsblocks.message_delta— Abschließende Aktualisierungen der Nachricht (Stop-Grund, finale Nutzung).message_stop— Der Stream ist abgeschlossen.
Bei Textantworten erhältst du viele content_block_delta-Events, die jeweils ein kleines Textstück enthalten (oft einige wenige Tokens).
Grundlegende Streaming-Implementierung
Hier ist ein synchrones Streaming-Beispiel mit dem Anthropic Python SDK:
import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str): """Stream a basic Claude response token by token.""" with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: for text in stream.text_stream: print(text, end="", flush=True) print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")Und die asynchrone Version, die du in produktiven Web-Servern verwenden wirst:
import anthropicimport asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str): """Async streaming with the Anthropic SDK.""" async with async_client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))Umgang mit partiellen Tokens und UTF-8-Grenzen
Das SDK übernimmt die UTF-8-Dekodierung für dich, aber wenn du mit dem rohen HTTP-Stream arbeitest, beachte, dass Multi-Byte-Zeichen über Chunks hinweg aufgeteilt werden können. Puffere immer rohe Bytes und dekodiere erst, wenn du vollständige UTF-8-Sequenzen hast. Der text_stream-Iterator des anthropic-SDKs erledigt das automatisch – ein weiterer Grund, ihn zu nutzen, anstatt den rohen SSE-Stream selbst zu parsen.
Abschnitt 2: Streaming von Tool-Calls
Einfaches Text-Streaming ist das Mindeste. Die eigentliche Herausforderung – und der eigentliche Mehrwert – entsteht, wenn dein Agent Tools verwendet. Ein mehrstufiger Agent, der drei Tools nacheinander aufruft, kann sich während der Tool-Ausführung tot anfühlen, wenn du nicht zeigst, was passiert.
Wie Tool-Calls im Stream erscheinen
Wenn Claude entscheidet, ein Tool zu verwenden, sendet der Stream ein content_block_start-Event mit type: "tool_use", gefolgt von content_block_delta-Events, die Fragmente des Tool-Input-JSON enthalten. Sobald der vollständige Tool-Call zusammengesetzt ist, führst du das Tool aus, fügst das Ergebnis ein und setzt das Gespräch fort.
Die entscheidende Erkenntnis: Du kennst den Tool-Namen, sobald content_block_start eintrifft, noch bevor das Input-JSON vollständig ist. Das bedeutet, du kannst dem Nutzer sofort etwas wie „🔍 Durchsuche Bestelldatenbank…” anzeigen, ohne auf den vollständigen Tool-Call zu warten.
Die vollständige Streaming-Agent-Schleife
Hier ist eine vollständige Streaming-Agent-Schleife, die Tool-Aufrufe in Echtzeit anzeigt:
import anthropicimport json
client = anthropic.Anthropic()
# Define toolstools = [ { "name": "search_orders", "description": "Search customer orders by order ID or customer email.", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Order ID or email"}, }, "required": ["query"], }, }, { "name": "get_shipping_status", "description": "Get real-time shipping status for an order.", "input_schema": { "type": "object", "properties": { "order_id": {"type": "string", "description": "The order ID"}, }, "required": ["order_id"], }, },]
def execute_tool(tool_name: str, tool_input: dict) -> str: """Execute a tool and return the result as a string.""" if tool_name == "search_orders": # Simulated database lookup return json.dumps({ "order_id": "ORD-12345", "customer": "jane@example.com", "items": ["Blue Widget x2", "Red Gadget x1"], "total": "$47.99", "status": "shipped", }) elif tool_name == "get_shipping_status": return json.dumps({ "order_id": tool_input["order_id"], "carrier": "FedEx", "tracking": "7891011", "estimated_delivery": "2026-03-10", "current_location": "Memphis, TN", }) return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str): """ Complete streaming agent loop with real-time tool call display. """ messages = [{"role": "user", "content": user_message}]
while True: # Stream the model response tool_calls = [] current_tool = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, tools=tools, messages=messages, ) as stream: response = None for event in stream: # The SDK exposes raw events via iteration pass
# Use the helper to collect text and tool use response = stream.get_final_message()
# Process the response content blocks for block in response.content: if block.type == "text": print(block.text, end="", flush=True) elif block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id
# Show the user what's happening print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool result = execute_tool(tool_name, tool_input) print(f"✅ Result received from {tool_name}")
tool_calls.append({ "tool_use_id": tool_id, "tool_name": tool_name, "tool_input": tool_input, "result": result, })
# If the model stopped because it wants to use tools, continue the loop if response.stop_reason == "tool_use": # Add assistant message with all content blocks messages.append({"role": "assistant", "content": response.content})
# Add tool results tool_results = [] for tc in tool_calls: tool_results.append({ "type": "tool_result", "tool_use_id": tc["tool_use_id"], "content": tc["result"], }) messages.append({"role": "user", "content": tool_results})
print("\n--- Continuing agent loop ---") else: # Model is done (stop_reason == "end_turn") print() break
# Run the agentstream_agent_response("Where is my order #ORD-12345? When will it arrive?")Für echtes Token-für-Token-Streaming mit Tool-Call-Erkennung kannst du über die rohen Events iterieren:
def stream_agent_with_live_tokens(user_message: str): """Stream text tokens live while also detecting tool calls.""" messages = [{"role": "user", "content": user_message}]
---
## Verwandte Artikel
- [Tool-Nutzungsmuster: Zuverlässige Agent-Tool-Schnittstellen entwickeln](/de/blog/agent-tool-use-patterns/)- [Multi-Agenten-Muster: Orchestratoren, Worker und Pipelines](/de/blog/multi-agent-patterns/)- [Agent-Fehlerbehandlung: 5 Muster für Produktionszuverlässigkeit](/de/blog/agent-error-recovery-patterns/)- [Web-Automatisierungsagenten: Browsersteuerung mit Claude und Computer Use](/de/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)