Потоковая передача ответов агента: вывод в реальном времени для многошаговых процессов
Потоковая передача ответов агента: вывод в реальном времени для многошаговых процессов
Ваш агент тратит 20 секунд на исследование вопроса, вызов трёх инструментов и формирование ответа. Пользователь нажимает «Спросить» и 20 секунд смотрит на крутящийся индикатор загрузки. Он не уверен, работает ли система. Думает обновить страницу. Задаётся вопросом, не начать ли всё заново.
Теперь представьте другой сценарий: пользователь нажимает «Спросить» и уже через 500 миллисекунд видит первые слова агента. Он наблюдает, как тот ищет информацию — «🔍 Поиск в базе данных заказов…» — и видит результаты в режиме реального времени. Он читает ответ по мере его написания, предложение за предложением. Те же 20 секунд. Совершенно другой опыт.
Стриминг — не приятный бонус для агентов с пользовательским интерфейсом, а обязательное требование к UX. Время до первого токена — важнейшая метрика задержки в интерфейсах агентов. Пользователи, которые видят прогресс, готовы ждать. Пользователи, которые не видят ничего, уходят. Многочисленные исследования веб-производительности показывают, что воспринимаемая задержка важнее реальной, а стриминг — наиболее мощный инструмент для сокращения разрыва между ними.
В этой статье вы узнаете, как реализовать потоковую передачу в реальном времени для многошаговых ИИ-агентов — от базовой пословной доставки до прозрачности вызовов инструментов, обновлений статуса, прогрессивного раскрытия информации, восстановления после ошибок и выбора транспортного уровня.
Раздел 1: Основы потокового API Claude
Прежде чем организовать стриминг агента, нужно наладить потоковую передачу одного ответа Claude. Начнём с основ.
Потоковый и непотоковый режимы
Непотоковый вызов API блокируется до тех пор, пока не будет сгенерирован весь ответ, и возвращает его сразу целиком. Потоковый вызов возвращает последовательность событий по мере формирования ответа, начиная с первого токена.
Разница в коде минимальна. Разница в пользовательском опыте — огромна.
Типы событий
Потоковый API Claude генерирует структурированную последовательность событий, отправляемых с сервера:
message_start— содержит исходный объектMessageс метаданными (модель, роль, использование).content_block_start— сигнализирует о начале блока контента (текст или tool_use).content_block_delta— содержит инкрементальный контент: фрагменты текста или частичный JSON входных данных инструмента.content_block_stop— сигнализирует об окончании блока контента.message_delta— финальные обновления сообщения (причина остановки, итоговое использование).message_stop— поток завершён.
Для текстовых ответов вы получите множество событий content_block_delta, каждое из которых содержит небольшой фрагмент текста (обычно несколько токенов).
Базовая реализация стриминга
Вот синхронный пример стриминга с использованием Python SDK от Anthropic:
import anthropic
client = anthropic.Anthropic()
def stream_basic_response(user_message: str): """Stream a basic Claude response token by token.""" with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: for text in stream.text_stream: print(text, end="", flush=True) print() # Newline after stream completes
stream_basic_response("Explain quantum entanglement in simple terms.")И асинхронная версия, которую вы будете использовать на боевых веб-серверах:
import anthropicimport asyncio
async_client = anthropic.AsyncAnthropic()
async def stream_basic_response_async(user_message: str): """Async streaming with the Anthropic SDK.""" async with async_client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) print()
asyncio.run(stream_basic_response_async("Explain quantum entanglement in simple terms."))Обработка частичных токенов и границ UTF-8
SDK берёт на себя декодирование UTF-8, но если вы работаете с необработанным HTTP-потоком, учитывайте, что многобайтовые символы могут быть разделены между чанками. Всегда буферизуйте необработанные байты и декодируйте их только при наличии полных UTF-8-последовательностей. Итератор text_stream SDK anthropic обрабатывает это автоматически — ещё один повод использовать его, а не парсить необработанный SSE-поток вручную.
Раздел 2: Стриминг вызовов инструментов
Базовый стриминг текста — лишь отправная точка. Настоящий вызов — и настоящая ценность — возникают, когда агент использует инструменты. Многошаговый агент, последовательно вызывающий три инструмента, может казаться «мёртвым» во время их выполнения, если не показывать, что происходит.
Как вызовы инструментов отображаются в потоке
Когда Claude решает использовать инструмент, поток генерирует событие content_block_start с типом "tool_use", за которым следуют события content_block_delta с фрагментами JSON входных данных инструмента. После сборки полного вызова инструмента вы его выполняете, добавляете результат и продолжаете диалог.
Ключевое наблюдение: имя инструмента становится известно сразу после прихода content_block_start, ещё до завершения формирования входного JSON. Это означает, что вы можете немедленно показать пользователю что-то вроде «🔍 Поиск в базе данных заказов…», не дожидаясь полного вызова инструмента.
Полный цикл потокового агента
Вот полный цикл потокового агента с отображением вызовов инструментов в реальном времени:
import anthropicimport json
client = anthropic.Anthropic()
# Define toolstools = [ { "name": "search_orders", "description": "Search customer orders by order ID or customer email.", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Order ID or email"}, }, "required": ["query"], }, }, { "name": "get_shipping_status", "description": "Get real-time shipping status for an order.", "input_schema": { "type": "object", "properties": { "order_id": {"type": "string", "description": "The order ID"}, }, "required": ["order_id"], }, },]
def execute_tool(tool_name: str, tool_input: dict) -> str: """Execute a tool and return the result as a string.""" if tool_name == "search_orders": # Simulated database lookup return json.dumps({ "order_id": "ORD-12345", "customer": "jane@example.com", "items": ["Blue Widget x2", "Red Gadget x1"], "total": "$47.99", "status": "shipped", }) elif tool_name == "get_shipping_status": return json.dumps({ "order_id": tool_input["order_id"], "carrier": "FedEx", "tracking": "7891011", "estimated_delivery": "2026-03-10", "current_location": "Memphis, TN", }) return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(user_message: str): """ Complete streaming agent loop with real-time tool call display. """ messages = [{"role": "user", "content": user_message}]
while True: # Stream the model response tool_calls = [] current_tool = None
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, tools=tools, messages=messages, ) as stream: response = None for event in stream: # The SDK exposes raw events via iteration pass
# Use the helper to collect text and tool use response = stream.get_final_message()
# Process the response content blocks for block in response.content: if block.type == "text": print(block.text, end="", flush=True) elif block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id
# Show the user what's happening print(f"\n⚙️ Calling tool: {tool_name}({json.dumps(tool_input)})")
# Execute the tool result = execute_tool(tool_name, tool_input) print(f"✅ Result received from {tool_name}")
tool_calls.append({ "tool_use_id": tool_id, "tool_name": tool_name, "tool_input": tool_input, "result": result, })
# If the model stopped because it wants to use tools, continue the loop if response.stop_reason == "tool_use": # Add assistant message with all content blocks messages.append({"role": "assistant", "content": response.content})
# Add tool results tool_results = [] for tc in tool_calls: tool_results.append({ "type": "tool_result", "tool_use_id": tc["tool_use_id"], "content": tc["result"],
---
## Связанные статьи
- [Паттерны использования инструментов: надёжные интерфейсы агент-инструмент](/ru/blog/agent-tool-use-patterns/)- [Мультиагентные паттерны: оркестраторы, воркеры и конвейеры](/ru/blog/multi-agent-patterns/)- [Восстановление Агентов После Ошибок: 5 Паттернов для Продакшн-Надёжности](/ru/blog/agent-error-recovery-patterns/)- [Агенты веб-автоматизации: управление браузером с Claude и Computer Use](/ru/blog/web-automation-agents-browser-control-with-claude-and-computer-use/)