流式代理响应:多步骤工作流的实时输出
流式代理响应:多步骤工作流的实时输出
你的代理需要20秒来研究一个问题、调用三个工具并综合出答案。用户点击”提问”后,盯着加载动画等了20秒。他们不确定系统是否在运行,考虑要不要刷新页面,甚至想着是否该重新开始。
现在换一种场景:用户点击”提问”,500毫秒内就看到代理的第一批文字出现。他们看着代理搜索信息——”🔍 正在搜索订单数据库……”——并实时看到结果返回。他们逐句阅读正在生成的答案。同样是20秒,体验却截然不同。
对于面向用户的代理而言,流式传输不是锦上添花——而是用户体验的必要条件。首个token的响应时间是代理界面中最重要的延迟指标。看到进度的用户会耐心等待;什么都看不到的用户则会放弃。大量Web性能研究表明,感知延迟比实际延迟更重要,而流式传输是缩小两者差距最有力的工具。
本文将介绍如何为多步骤AI代理实现实时流式传输——从逐token输出,到工具调用透明度、状态更新、渐进式信息展示、错误恢复,再到传输层的选择。
第一节:Claude 流式 API 基础
在为代理实现流式传输之前,需要先掌握单次 Claude 响应的流式处理。我们从基础开始。
流式与非流式的区别
非流式 API 调用会阻塞,直到整个响应生成完毕后一次性返回。流式调用则在响应生成过程中逐步返回一系列事件,从第一个 token 开始。
代码层面的差异极小,用户体验的差异却是天壤之别。
事件类型
Claude 流式 API 会发出结构化的服务器推送事件序列:
message_start— 包含初始Message对象及元数据(模型、角色、用量)。content_block_start— 标志一个内容块(文本或 tool_use)的开始。content_block_delta— 包含增量内容:文本片段或部分工具输入 JSON。content_block_stop— 标志一个内容块的结束。message_delta— 消息的最终更新(停止原因、最终用量)。message_stop— 流传输完成。
对于文本响应,你会收到许多 content_block_delta 事件,每个事件携带一小段文本(通常是几个 token)。
基础流式实现
以下是使用 Anthropic Python SDK 的同步流式示例:
import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str): """Stream a basic Claude response token by token.""" with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: for text in stream.text_stream: print(text, end="", flush=True) print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")以下是异步版本,适用于生产环境的 Web 服务器:
import anthropicimport asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str): """Async streaming with the Anthropic SDK.""" async with async_client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))处理部分 Token 与 UTF-8 边界
SDK 会自动处理 UTF-8 解码,但如果你直接操作原始 HTTP 流,需注意多字节字符可能被分割在不同数据块中。请始终缓冲原始字节,仅在获得完整 UTF-8 序列时才进行解码。anthropic SDK 的 text_stream 迭代器会自动处理这一问题——这也是建议使用 SDK 而非自行解析原始 SSE 流的另一个原因。
第二节:流式工具调用
基础的文本流式传输只是入门。真正的挑战——也是真正的价值——在于代理使用工具时。一个依次调用三个工具的多步骤代理,如果不展示工具执行过程,会让用户感觉系统毫无响应。
工具调用在流中的呈现方式
当 Claude 决定使用工具时,流会发出一个 content_block_start 事件,其 type 为 "tool_use",随后是包含工具输入 JSON 片段的 content_block_delta 事件。工具调用完整组装后,执行该工具,注入结果,并继续对话。
关键洞察:content_block_start 到达时你就已经知道工具名称,无需等待完整的输入 JSON。这意味着你可以立即向用户展示类似”🔍 正在搜索订单数据库……”的提示,而无需等待工具调用完全解析完毕。
完整的流式代理循环
以下是一个完整的流式代理循环,可实时展示工具调用过程:
import anthropicimport json
client = anthropic.Anthropic()
# Define toolstools = [ { "name": "search_orders", "description": "Search customer orders by order ID or customer email.", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Order ID or email"}, }, "required": ["query"], }, }, { "name": "get_shipping_status", "description": "Get real-time shipping status for an order.", "input_schema": { "type": "object", "properties": { "order_id": {"type": "string", "description": "The order ID"}, }, "required": ["order_id"], }, },]
def execute_tool(tool_name: str, tool_input: dict) -> str: """Execute a tool and return the result as a string.""" if tool_name == "search_orders": # Simulated database lookup return json.dumps({ "order_id": "ORD-12345", "customer": "jane@example.com", "items": ["Blue Widget x2", "Red Gadget x1"], "total": "$47.99", "status": "shipped", }) elif tool_name == "get_shipping_status": return json.dumps({ "order_id": tool_input["order_id"], "carrier": "FedEx", "tracking": "7891011", "estimated_delivery": "2026-03-10", "current_location": "Memphis, TN", }) return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str): """ Complete streaming agent loop with real-time tool call display. """ messages = [{"role": "user", "content": user_message}]
while True: # Stream the model response tool_calls = [] current_tool = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, tools=tools, messages=messages, ) as stream: response = None for event in stream: # The SDK exposes raw events via iteration pass
# Use the helper to collect text and tool use response = stream.get_final_message()
# Process the response content blocks for block in response.content: if block.type == "text": print(block.text, end="", flush=True) elif block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id
# Show the user what's happening print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool result = execute_tool(tool_name, tool_input) print(f"✅ Result received from {tool_name}")
tool_calls.append({ "tool_use_id": tool_id, "tool_name": tool_name, "tool_input": tool_input, "result": result, })
# If the model stopped because it wants to use tools, continue the loop if response.stop_reason == "tool_use": # Add assistant message with all content blocks messages.append({"role": "assistant", "content": response.content})
# Add tool results tool_results = [] for tc in tool_calls: tool_results.append({ "type": "tool_result", "tool_use_id": tc["tool_use_id"], "content": tc["result"], }) messages.append({"role": "user", "content": tool_results})
print("\n--- Continuing agent loop ---") else: # Model is done (stop_reason == "end_turn") print() break
# Run the agentstream_agent_response("Where is my order #ORD-12345? When will it arrive?")若要实现逐 token 流式传输并同步检测工具调用,可以遍历原始事件:
def stream_agent_with_live_tokens(user_message: str): """Stream text tokens live while also detecting tool calls.""" messages = [{"role": "user", "content": user_message}]
while True: collected_content = [] current_text = "" current_tool_name = None current_tool_input_json = "" current_tool_id = None stop_reason = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, tools=tools, messages=messages, ) as stream: for event in stream: if hasattr(event, 'type'): if event.type == 'content_block_start': if event.content_block.type
---
## 相关文章
- [工具使用模式:构建可靠的智能体-工具接口](/zh/blog/agent-tool-use-patterns/)- [多代理模式:编排器、Worker 与管道](/zh/blog/multi-agent-patterns/)- [智能体错误恢复:5种生产环境可靠性模式](/zh/blog/agent-error-recovery-patterns/)- [网页自动化代理:使用 Claude 和 Computer Use 控制浏览器](/zh/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)