计算机访问系统

流式代理响应:多步骤工作流的实时输出

你的代理需要20秒来研究一个问题、调用三个工具并综合出答案。用户点击”提问”后,盯着加载动画等了20秒。他们不确定系统是否在运行,考虑要不要刷新页面,甚至想着是否该重新开始。

现在换一种场景:用户点击”提问”,500毫秒内就看到代理的第一批文字出现。他们看着代理搜索信息——”🔍 正在搜索订单数据库……”——并实时看到结果返回。他们逐句阅读正在生成的答案。同样是20秒,体验却截然不同。

对于面向用户的代理而言,流式传输不是锦上添花——而是用户体验的必要条件。首个token的响应时间是代理界面中最重要的延迟指标。看到进度的用户会耐心等待;什么都看不到的用户则会放弃。大量Web性能研究表明,感知延迟比实际延迟更重要,而流式传输是缩小两者差距最有力的工具。

本文将介绍如何为多步骤AI代理实现实时流式传输——从逐token输出,到工具调用透明度、状态更新、渐进式信息展示、错误恢复,再到传输层的选择。


第一节:Claude 流式 API 基础

在为代理实现流式传输之前,需要先掌握单次 Claude 响应的流式处理。我们从基础开始。

流式与非流式的区别

非流式 API 调用会阻塞,直到整个响应生成完毕后一次性返回。流式调用则在响应生成过程中逐步返回一系列事件,从第一个 token 开始。

代码层面的差异极小,用户体验的差异却是天壤之别。

事件类型

Claude 流式 API 会发出结构化的服务器推送事件序列:

  1. message_start — 包含初始 Message 对象及元数据(模型、角色、用量)。
  2. content_block_start — 标志一个内容块(文本或 tool_use)的开始。
  3. content_block_delta — 包含增量内容:文本片段或部分工具输入 JSON。
  4. content_block_stop — 标志一个内容块的结束。
  5. message_delta — 消息的最终更新(停止原因、最终用量)。
  6. message_stop — 流传输完成。

对于文本响应,你会收到许多 content_block_delta 事件,每个事件携带一小段文本(通常是几个 token)。

基础流式实现

以下是使用 Anthropic Python SDK 的同步流式示例:

import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str):
"""Stream a basic Claude response token by token."""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")

以下是异步版本,适用于生产环境的 Web 服务器:

import anthropic
import asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str):
"""Async streaming with the Anthropic SDK."""
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))

处理部分 Token 与 UTF-8 边界

SDK 会自动处理 UTF-8 解码,但如果你直接操作原始 HTTP 流,需注意多字节字符可能被分割在不同数据块中。请始终缓冲原始字节,仅在获得完整 UTF-8 序列时才进行解码。anthropic SDK 的 text_stream 迭代器会自动处理这一问题——这也是建议使用 SDK 而非自行解析原始 SSE 流的另一个原因。


第二节:流式工具调用

基础的文本流式传输只是入门。真正的挑战——也是真正的价值——在于代理使用工具时。一个依次调用三个工具的多步骤代理,如果不展示工具执行过程,会让用户感觉系统毫无响应。

工具调用在流中的呈现方式

当 Claude 决定使用工具时,流会发出一个 content_block_start 事件,其 type"tool_use",随后是包含工具输入 JSON 片段的 content_block_delta 事件。工具调用完整组装后,执行该工具,注入结果,并继续对话。

关键洞察:content_block_start 到达时你就已经知道工具名称,无需等待完整的输入 JSON。这意味着你可以立即向用户展示类似”🔍 正在搜索订单数据库……”的提示,而无需等待工具调用完全解析完毕。

完整的流式代理循环

以下是一个完整的流式代理循环,可实时展示工具调用过程:

import anthropic
import json
client = anthropic.Anthropic()
# Define tools
tools = [
{
"name": "search_orders",
"description": "Search customer orders by order ID or customer email.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Order ID or email"},
},
"required": ["query"],
},
},
{
"name": "get_shipping_status",
"description": "Get real-time shipping status for an order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "The order ID"},
},
"required": ["order_id"],
},
},
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return the result as a string."""
if tool_name == "search_orders":
# Simulated database lookup
return json.dumps({
"order_id": "ORD-12345",
"customer": "jane@example.com",
"items": ["Blue Widget x2", "Red Gadget x1"],
"total": "$47.99",
"status": "shipped",
})
elif tool_name == "get_shipping_status":
return json.dumps({
"order_id": tool_input["order_id"],
"carrier": "FedEx",
"tracking": "7891011",
"estimated_delivery": "2026-03-10",
"current_location": "Memphis, TN",
})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str):
"""
Complete streaming agent loop with real-time tool call display.
"""
messages = [{"role": "user", "content": user_message}]
while True:
# Stream the model response
tool_calls = []
current_tool = None
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
response = None
for event in stream:
# The SDK exposes raw events via iteration
pass
# Use the helper to collect text and tool use
response = stream.get_final_message()
# Process the response content blocks
for block in response.content:
if block.type == "text":
print(block.text, end="", flush=True)
elif block.type == "tool_use":
tool_name = block.name
tool_input = block.input
tool_id = block.id
# Show the user what's happening
print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool
result = execute_tool(tool_name, tool_input)
print(f"✅ Result received from {tool_name}")
tool_calls.append({
"tool_use_id": tool_id,
"tool_name": tool_name,
"tool_input": tool_input,
"result": result,
})
# If the model stopped because it wants to use tools, continue the loop
if response.stop_reason == "tool_use":
# Add assistant message with all content blocks
messages.append({"role": "assistant", "content": response.content})
# Add tool results
tool_results = []
for tc in tool_calls:
tool_results.append({
"type": "tool_result",
"tool_use_id": tc["tool_use_id"],
"content": tc["result"],
})
messages.append({"role": "user", "content": tool_results})
print("\n--- Continuing agent loop ---")
else:
# Model is done (stop_reason == "end_turn")
print()
break
# Run the agent
stream_agent_response("Where is my order #ORD-12345? When will it arrive?")

若要实现逐 token 流式传输并同步检测工具调用,可以遍历原始事件:

def stream_agent_with_live_tokens(user_message: str):
"""Stream text tokens live while also detecting tool calls."""
messages = [{"role": "user", "content": user_message}]
while True:
collected_content = []
current_text = ""
current_tool_name = None
current_tool_input_json = ""
current_tool_id = None
stop_reason = None
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
for event in stream:
if hasattr(event, 'type'):
if event.type == 'content_block_start':
if event.content_block.type
---
## 相关文章
- [工具使用模式:构建可靠的智能体-工具接口](/zh/blog/agent-tool-use-patterns/)
- [多代理模式:编排器、Worker 与管道](/zh/blog/multi-agent-patterns/)
- [智能体错误恢复:5种生产环境可靠性模式](/zh/blog/agent-error-recovery-patterns/)
- [网页自动化代理:使用 Claude 和 Computer Use 控制浏览器](/zh/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)