СИСТЕМА КОМПЬЮТЕРНОГО ДОСТУПА

Потоковая передача ответов агента: вывод в реальном времени для многошаговых процессов


Потоковая передача ответов агента: вывод в реальном времени для многошаговых процессов

Ваш агент тратит 20 секунд на исследование вопроса, вызов трёх инструментов и формирование ответа. Пользователь нажимает «Спросить» и 20 секунд смотрит на крутящийся индикатор загрузки. Он не уверен, работает ли система. Думает обновить страницу. Задаётся вопросом, не начать ли всё заново.

Теперь представьте другой сценарий: пользователь нажимает «Спросить» и уже через 500 миллисекунд видит первые слова агента. Он наблюдает, как тот ищет информацию — «🔍 Поиск в базе данных заказов…» — и видит результаты в режиме реального времени. Он читает ответ по мере его написания, предложение за предложением. Те же 20 секунд. Совершенно другой опыт.

Стриминг — не приятный бонус для агентов с пользовательским интерфейсом, а обязательное требование к UX. Время до первого токена — важнейшая метрика задержки в интерфейсах агентов. Пользователи, которые видят прогресс, готовы ждать. Пользователи, которые не видят ничего, уходят. Многочисленные исследования веб-производительности показывают, что воспринимаемая задержка важнее реальной, а стриминг — наиболее мощный инструмент для сокращения разрыва между ними.

В этой статье вы узнаете, как реализовать потоковую передачу в реальном времени для многошаговых ИИ-агентов — от базовой пословной доставки до прозрачности вызовов инструментов, обновлений статуса, прогрессивного раскрытия информации, восстановления после ошибок и выбора транспортного уровня.


Раздел 1: Основы потокового API Claude

Прежде чем организовать стриминг агента, нужно наладить потоковую передачу одного ответа Claude. Начнём с основ.

Потоковый и непотоковый режимы

Непотоковый вызов API блокируется до тех пор, пока не будет сгенерирован весь ответ, и возвращает его сразу целиком. Потоковый вызов возвращает последовательность событий по мере формирования ответа, начиная с первого токена.

Разница в коде минимальна. Разница в пользовательском опыте — огромна.

Типы событий

Потоковый API Claude генерирует структурированную последовательность событий, отправляемых с сервера:

  1. message_start — содержит исходный объект Message с метаданными (модель, роль, использование).
  2. content_block_start — сигнализирует о начале блока контента (текст или tool_use).
  3. content_block_delta — содержит инкрементальный контент: фрагменты текста или частичный JSON входных данных инструмента.
  4. content_block_stop — сигнализирует об окончании блока контента.
  5. message_delta — финальные обновления сообщения (причина остановки, итоговое использование).
  6. message_stop — поток завершён.

Для текстовых ответов вы получите множество событий content_block_delta, каждое из которых содержит небольшой фрагмент текста (обычно несколько токенов).

Базовая реализация стриминга

Вот синхронный пример стриминга с использованием Python SDK от Anthropic:

import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str):
"""Stream a basic Claude response token by token."""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")

И асинхронная версия, которую вы будете использовать на боевых веб-серверах:

import anthropic
import asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str):
"""Async streaming with the Anthropic SDK."""
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))

Обработка частичных токенов и границ UTF-8

SDK берёт на себя декодирование UTF-8, но если вы работаете с необработанным HTTP-потоком, учитывайте, что многобайтовые символы могут быть разделены между чанками. Всегда буферизуйте необработанные байты и декодируйте их только при наличии полных UTF-8-последовательностей. Итератор text_stream SDK anthropic обрабатывает это автоматически — ещё один повод использовать его, а не парсить необработанный SSE-поток вручную.


Раздел 2: Стриминг вызовов инструментов

Базовый стриминг текста — лишь отправная точка. Настоящий вызов — и настоящая ценность — возникают, когда агент использует инструменты. Многошаговый агент, последовательно вызывающий три инструмента, может казаться «мёртвым» во время их выполнения, если не показывать, что происходит.

Как вызовы инструментов отображаются в потоке

Когда Claude решает использовать инструмент, поток генерирует событие content_block_start с типом "tool_use", за которым следуют события content_block_delta с фрагментами JSON входных данных инструмента. После сборки полного вызова инструмента вы его выполняете, добавляете результат и продолжаете диалог.

Ключевое наблюдение: имя инструмента становится известно сразу после прихода content_block_start, ещё до завершения формирования входного JSON. Это означает, что вы можете немедленно показать пользователю что-то вроде «🔍 Поиск в базе данных заказов…», не дожидаясь полного вызова инструмента.

Полный цикл потокового агента

Вот полный цикл потокового агента с отображением вызовов инструментов в реальном времени:

import anthropic
import json
client = anthropic.Anthropic()
# Define tools
tools = [
{
"name": "search_orders",
"description": "Search customer orders by order ID or customer email.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Order ID or email"},
},
"required": ["query"],
},
},
{
"name": "get_shipping_status",
"description": "Get real-time shipping status for an order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "The order ID"},
},
"required": ["order_id"],
},
},
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return the result as a string."""
if tool_name == "search_orders":
# Simulated database lookup
return json.dumps({
"order_id": "ORD-12345",
"customer": "jane@example.com",
"items": ["Blue Widget x2", "Red Gadget x1"],
"total": "$47.99",
"status": "shipped",
})
elif tool_name == "get_shipping_status":
return json.dumps({
"order_id": tool_input["order_id"],
"carrier": "FedEx",
"tracking": "7891011",
"estimated_delivery": "2026-03-10",
"current_location": "Memphis, TN",
})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str):
"""
Complete streaming agent loop with real-time tool call display.
"""
messages = [{"role": "user", "content": user_message}]
while True:
# Stream the model response
tool_calls = []
current_tool = None
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
response = None
for event in stream:
# The SDK exposes raw events via iteration
pass
# Use the helper to collect text and tool use
response = stream.get_final_message()
# Process the response content blocks
for block in response.content:
if block.type == "text":
print(block.text, end="", flush=True)
elif block.type == "tool_use":
tool_name = block.name
tool_input = block.input
tool_id = block.id
# Show the user what's happening
print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool
result = execute_tool(tool_name, tool_input)
print(f"✅ Result received from {tool_name}")
tool_calls.append({
"tool_use_id": tool_id,
"tool_name": tool_name,
"tool_input": tool_input,
"result": result,
})
# If the model stopped because it wants to use tools, continue the loop
if response.stop_reason == "tool_use":
# Add assistant message with all content blocks
messages.append({"role": "assistant", "content": response.content})
# Add tool results
tool_results = []
for tc in tool_calls:
tool_results.append({
"type": "tool_result",
"tool_use_id": tc["tool_use_id"],
"content": tc["result"],
---
## Связанные статьи
- [Паттерны использования инструментов: надёжные интерфейсы агент-инструмент](/ru/blog/agent-tool-use-patterns/)
- [Мультиагентные паттерны: оркестраторы, воркеры и конвейеры](/ru/blog/multi-agent-patterns/)
- [Восстановление Агентов После Ошибок: 5 Паттернов для Продакшн-Надёжности](/ru/blog/agent-error-recovery-patterns/)
- [Агенты веб-автоматизации: управление браузером с Claude и Computer Use](/ru/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)