コンピュータアクセスシステム

エージェントレスポンスのストリーミング:マルチステップワークフローのリアルタイム出力


エージェントレスポンスのストリーミング:マルチステップワークフローのリアルタイム出力

エージェントが質問をリサーチし、3つのツールを呼び出し、回答を合成するまでに20秒かかるとします。ユーザーが「送信」をクリックすると、20秒間スピナーを見つめ続けます。本当に動いているのか不安になり、ページをリロードしようか考え始め、最初からやり直すべきか迷います。

では、こんな場面を想像してみてください。ユーザーが「送信」をクリックすると、500ミリ秒以内にエージェントの最初の言葉が表示されます。「🔍 注文データベースを検索中…」という形で情報を検索する様子を見守り、結果がリアルタイムで届くのを目にします。回答が一文一文書かれていくのをリアルタイムで読み進めます。かかる時間は同じ20秒。でも、体験はまったく異なります。

ストリーミングは、ユーザー向けエージェントにとって「あれば便利」な機能ではありません。UXの必須要件です。最初のトークンが届くまでの時間(Time-to-first-token)は、エージェントインターフェースにおける最も重要なレイテンシ指標です。進行状況が見えるユーザーは辛抱強く待ちます。何も見えないユーザーは離脱します。ウェブパフォーマンスに関する数多くの研究が示すように、体感レイテンシは実際のレイテンシよりも重要であり、両者のギャップを埋める最も強力なツールがストリーミングです。

この記事では、マルチステップAIエージェントにリアルタイムストリーミングを実装する方法を解説します。トークン単位の基本的な配信から、ツール呼び出しの透明性、ステータス更新、段階的な情報開示、エラー回復、トランスポート層の選択まで幅広くカバーします。


セクション1:Claude ストリーミングAPIの基礎

エージェントをストリーミングする前に、まずClaudeの単一レスポンスをストリーミングする方法を理解する必要があります。基礎から始めましょう。

ストリームあり vs. ストリームなし

ストリーミングなしのAPIコールは、レスポンス全体が生成されるまでブロックし、すべてを一度に返します。ストリーミングコールは、レスポンスが生成されるにつれて、最初のトークンから順にイベントのシーケンスを返します。

コード上の違いはわずかです。ユーザー体験の違いは絶大です。

イベントの種類

Claude ストリーミングAPIは、構造化されたserver-sent eventsのシーケンスを発行します。

  1. message_start — メタデータ(モデル、ロール、使用状況)を含む初期Messageオブジェクトを含みます。
  2. content_block_start — コンテンツブロック(テキストまたはtool_use)の開始を知らせます。
  3. content_block_delta — インクリメンタルなコンテンツ(テキストの断片または部分的なツール入力JSON)を含みます。
  4. content_block_stop — コンテンツブロックの終了を知らせます。
  5. message_delta — メッセージへの最終更新(停止理由、最終使用状況)を含みます。
  6. message_stop — ストリームが完了したことを示します。

テキストレスポンスの場合、各イベントがわずかなテキストチャンク(通常数トークン)を運ぶcontent_block_deltaイベントを多数受け取ります。

基本的なストリーミングの実装

Anthropic Python SDKを使った同期ストリーミングの例を示します。

import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str):
"""Stream a basic Claude response token by token."""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")

本番のウェブサーバーで使用する非同期バージョンは以下の通りです。

import anthropic
import asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str):
"""Async streaming with the Anthropic SDK."""
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))

部分的なトークンとUTF-8境界の処理

SDKはUTF-8デコードを自動的に処理しますが、生のHTTPストリームを直接扱う場合は、マルチバイト文字がチャンクをまたいで分割される可能性があることに注意してください。生のバイト列は常にバッファリングし、完全なUTF-8シーケンスが揃ってからデコードするようにしてください。anthropic SDKのtext_streamイテレータはこれを自動的に処理します。生のSSEストリームを自分でパースするよりも、SDKを使う方が望ましい理由の一つです。


セクション2:ツール呼び出しのストリーミング

基本的なテキストストリーミングは最低限の機能に過ぎません。真の課題と真の価値は、エージェントがツールを使用するときに生まれます。3つのツールを順番に呼び出すマルチステップエージェントは、ツール実行中に何も表示しないと動いていないように見えてしまいます。

ストリームにおけるツール呼び出しの見え方

Claudeがツールを使用しようとすると、ストリームはtype: "tool_use"を持つcontent_block_startイベントを発行し、続いてツールの入力JSONの断片を含むcontent_block_deltaイベントが続きます。ツール呼び出し全体が組み立てられたら、ツールを実行し、結果を注入して会話を続けます。

重要なポイント:ツール名はcontent_block_startが届いた時点でわかります。入力JSONが完成する前でも同様です。つまり、ツール呼び出し全体を待たずに、「🔍 注文データベースを検索中…」のような情報をすぐにユーザーに表示できます。

完全なストリーミングエージェントループ

ツール呼び出しをリアルタイムで表示する完全なストリーミングエージェントループを示します。

import anthropic
import json
client = anthropic.Anthropic()
# Define tools
tools = [
{
"name": "search_orders",
"description": "Search customer orders by order ID or customer email.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Order ID or email"},
},
"required": ["query"],
},
},
{
"name": "get_shipping_status",
"description": "Get real-time shipping status for an order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "The order ID"},
},
"required": ["order_id"],
},
},
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return the result as a string."""
if tool_name == "search_orders":
# Simulated database lookup
return json.dumps({
"order_id": "ORD-12345",
"customer": "jane@example.com",
"items": ["Blue Widget x2", "Red Gadget x1"],
"total": "$47.99",
"status": "shipped",
})
elif tool_name == "get_shipping_status":
return json.dumps({
"order_id": tool_input["order_id"],
"carrier": "FedEx",
"tracking": "7891011",
"estimated_delivery": "2026-03-10",
"current_location": "Memphis, TN",
})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str):
"""
Complete streaming agent loop with real-time tool call display.
"""
messages = [{"role": "user", "content": user_message}]
while True:
# Stream the model response
tool_calls = []
current_tool = None
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
response = None
for event in stream:
# The SDK exposes raw events via iteration
pass
# Use the helper to collect text and tool use
response = stream.get_final_message()
# Process the response content blocks
for block in response.content:
if block.type == "text":
print(block.text, end="", flush=True)
elif block.type == "tool_use":
tool_name = block.name
tool_input = block.input
tool_id = block.id
# Show the user what's happening
print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool
result = execute_tool(tool_name, tool_input)
print(f"✅ Result received from {tool_name}")
tool_calls.append({
"tool_use_id": tool_id,
"tool
---
## 関連記事
- [ツール使用パターン:信頼性の高いエージェント-ツールインターフェースの構築](/ja/blog/agent-tool-use-patterns/)
- [マルチエージェントパターン:オーケストレーター、ワーカー、パイプライン](/ja/blog/multi-agent-patterns/)
- [エージェントのエラー回復:本番環境の信頼性のための5つのパターン](/ja/blog/agent-error-recovery-patterns/)
- [Web自動化エージェント:ClaudeとComputer Useによるブラウザ制御](/ja/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)