Как вы реализуете streaming в production с учетом network limitations?
Краткий тезис
Streaming в production для RAG|Agentic RAG — это организация пошаговой передачи данных от сервера к клиенту (например, токенов ответа LLM) в реальном времени. Основные протоколы — Server-Sent Events (SSE) для однонаправленного потока и WebSocket для двустороннего интерактива. Ключевые аспекты с учётом сетевых ограничений: настройка chunk size (размера фрагмента), управление буфером (buffer management), реализация backpressure (обратного давления) на уровне proxy|reverse proxy и приложения, а также обработка потери пакетов, задержек и ограничений пропускной способности.
1. Термины и основы
Streaming — технология, при которой сервер отправляет данные частями (чанками) по мере их готовности, не дожидаясь полного ответа. В контексте RAG это позволяет пользователю видеть первые токены ответа]] LLM через миллисекунды после запроса, а не ждать десятки секунд.
Server-Sent Events (SSE) — протокол на основе HTTP, при котором сервер отправляет клиенту поток событий в виде текстовых сообщений (data: ...). SSE прост в реализации, работает через стандартные HTTP-прокси, но поддерживает только однонаправленную связь (сервер → клиент).
WebSocket — протокол, устанавливающий постоянное двустороннее соединение поверх TCP. Позволяет клиенту и серверу обмениваться сообщениями в реальном времени. Требует отдельного handshake (upgrade с HTTP), может блокироваться корпоративными файрволами.
Backpressure — механизм, при котором получатель сигнализирует отправителю о своей неготовности обрабатывать новые данные. В streaming это предотвращает переполнение буфера клиента или сети при медленном соединении.
Chunk size — размер одного фрагмента данных, отправляемого за один раз. Слишком маленький chunk увеличивает накладные расходы на заголовки и пакеты, слишком большой — задерживает первый токен и может вызвать тайм-ауты.
Buffer management — стратегия временного хранения данных на стороне сервера, прокси или клиента для сглаживания пиков скорости и компенсации сетевых задержек.
2. Почему streaming критичен в Agentic RAG
В Agentic RAG агент может выполнять несколько шагов: поиск, вызов инструментов, генерацию. Без streaming пользователь ждёт весь ответ целиком (latency = сумма всех шагов). Со streaming:
- Перцептивная задержка (perceived latency) снижается: пользователь видит первые токены через 200–500 мс.
- Интерактивность: агент может показывать промежуточные результаты (найденные документы, статус вызова API).
- Отмена запроса: клиент может прервать поток, если ответ уже не нужен, экономя ресурсы.
Сетевые ограничения (низкая пропускная способность, высокий RTT (Round-Trip Time), потери пакетов) напрямую влияют на качество streaming: могут возникать задержки, разрывы соединения, дублирование данных.
3. Протоколы: SSE vs WebSocket
| Характеристика | SSE | WebSocket |
|---|---|---|
| Направление | Однонаправленное (сервер → клиент) | Двунаправленное |
| Протокол | HTTP/1.1 или HTTP/2 | Собственный (ws://, wss://) |
| Поддержка прокси | Отличная (через HTTP) | Может блокироваться |
| Автоматическое переподключение | Встроено (EventSource API) | Требует реализации |
| Потоковая передача бинарных данных | Только текст (base64) | Да (бинарные фреймы) |
| Количество одновременных соединений | Ограничено браузером (6–8 на домен) | Практически не ограничено |
| Простота реализации | Очень простая | Средняя |
Выбор: Для типичного Agentic RAG, где нужно только отправлять токены LLM клиенту, SSE — оптимальный выбор. WebSocket оправдан, если клиент должен отправлять команды во время генерации (например, «стоп», «измени параметр»).
4. Chunk size и buffer management
4.1 Выбор chunk size
Chunk size — количество символов или токенов, отправляемых в одном сообщении. Влияет на:
- Latency до первого токена (TTFB — Time To First Byte): маленький chunk (1–5 токенов) даёт быстрый старт.
- Overhead: каждый chunk требует HTTP-заголовков (для SSE) или фреймов (WebSocket). Слишком мелкие чанки увеличивают накладные расходы.
- Пропускная способность: крупные чанки (100–500 токенов) эффективнее при высокой скорости генерации, но увеличивают задержку между обновлениями.
Практическое правило: начинать с 1–2 токенов для первого сообщения (чтобы TTFB был минимальным), затем увеличить до 10–20 токенов для остальных. Для медленных сетей (мобильный интернет) лучше уменьшить размер, чтобы избежать потери пакетов при фрагментации.
4.2 Buffer management
Буферизация на стороне сервера:
- Наивный подход: отправлять каждый токен сразу → много мелких пакетов, высокий overhead.
- Адаптивная буферизация: накапливать токены до достижения порога по времени (например, 50 мс) или по размеру (например, 100 байт). Это снижает количество пакетов, но увеличивает задержку.
Пример реализации на Python (FastAPI + SSE):
import asyncio
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
async def token_generator(query: str):
# Имитация генерации токенов LLM
for token in ["Привет", ", ", "как", " дела", "?"]:
yield {"data": token}
await asyncio.sleep(0.1) # имитация задержки
@app.get("/stream")
async def stream_endpoint(request: Request, query: str):
async def event_stream():
buffer = []
buffer_size = 0
max_buffer_size = 100 # байт
max_buffer_time = 0.05 # 50 мс
last_flush = asyncio.get_event_loop().time()
async for token in token_generator(query):
buffer.append(token["data"])
buffer_size += len(token["data"].encode("utf-8"))
now = asyncio.get_event_loop().time()
if buffer_size >= max_buffer_size or (now - last_flush) >= max_buffer_time:
yield {"data": "".join(buffer)}
buffer.clear()
buffer_size = 0
last_flush = now
# Остаток
if buffer:
yield {"data": "".join(buffer)}
return EventSourceResponse(event_stream())
Buffer management на стороне клиента: браузерный EventSource автоматически буферизирует входящие данные. Для WebSocket нужно реализовать свою очередь.
5. Backpressure
Backpressure — механизм, предотвращающий перегрузку клиента или сети. В streaming это особенно важно при медленных соединениях.
5.1 Backpressure на уровне reverse proxy
Reverse proxy (Nginx, Envoy, HAProxy) может буферизировать ответ от сервера и отдавать клиенту с ограниченной скоростью. Настройка в Nginx:
location /stream {
proxy_pass http://backend;
proxy_buffering off; # отключаем буферизацию, чтобы данные шли сразу
proxy_cache off;
proxy_set_header Connection '';
chunked_transfer_encoding on;
# Ограничение скорости отдачи клиенту
limit_rate 100k; # 100 КБ/с
limit_rate_after 1m; # после первого мегабайта
}
proxy_buffering off — критично для SSE, иначе Nginx будет накапливать ответ и отдавать его кусками, нарушая real-time.
5.2 Backpressure на уровне приложения
Сервер должен знать, готов ли клиент принимать данные. При использовании WebSocket можно проверять socket.bufferedAmount (в браузере) или transport.get_write_buffer_size() (в aiohttp). Если буфер переполнен — приостановить генерацию.
Для SSE такой возможности нет (протокол не предусматривает обратную связь). Решение:
- Использовать WebSocket вместо SSE, если нужен контроль потока.
- На стороне клиента отправлять ACK (подтверждение) после обработки каждого чанка. Сервер ждёт ACK перед отправкой следующего.
- Применять адаптивный chunk size: если клиент медленный (определяем по времени между отправками), увеличиваем размер чанка, чтобы снизить частоту сообщений.
6. Network limitations и их влияние
6.1 Высокая задержка (latency)
- Проблема: каждый chunk проходит через сеть с RTT 100–300 мс (мобильные сети, межконтинентальные соединения). Если отправлять каждый токен отдельно, TTFB будет хорошим, но последующие токены будут приходить с задержкой RTT.
- Решение: увеличить chunk size, чтобы за одно RTT передавать больше данных. Использовать HTTP/2 multiplexing (SSE через HTTP/2 позволяет отправлять несколько потоков одновременно без блокировки).
6.2 Потеря пакетов (packet loss)
- Проблема: TCP retransmission задерживает доставку, возможны дубликаты.
- Решение: на уровне приложения добавить sequence numbers (номера чанков). Клиент может обнаружить пропуск и запросить повторную отправку. Для SSE это сложно, проще полагаться на TCP. Для критичных данных — использовать WebSocket с собственным протоколом подтверждения.
6.3 Ограниченная пропускная способность (bandwidth)
- Проблема: если скорость генерации LLM (например, 1000 токенов/с) превышает пропускную способность канала (например, 50 КБ/с), буфер на сервере или прокси растёт, вызывая задержки.
- Решение: ограничить скорость отправки на сервере (rate limiting) или динамически уменьшать chunk size. Можно также сжимать данные (gzip) — SSE поддерживает сжатие на уровне HTTP.
6.4 NAT и файрволы
- Проблема: WebSocket может быть заблокирован корпоративными файрволами. SSE работает через стандартный HTTP (порт 80/443) и почти всегда проходит.
- Решение: использовать SSE как основной протокол, а WebSocket — как fallback для интерактивных сценариев.
7. Production best practices
7.1 Надёжность соединения
- Retry: клиент (EventSource) автоматически переподключается при разрыве. Сервер должен поддерживать идемпотентность — повторная отправка тех же данных не должна ломать состояние.
- Reconnection strategy: экспоненциальная задержка (1 с, 2 с, 4 с...) с джиттером.
- Keep-alive: отправка пустых сообщений (heartbeat) каждые 15–30 секунд, чтобы прокси не закрывали соединение по тайм-ауту.
7.2 Мониторинг
- Метрики: количество активных streaming-соединений, средний размер чанка, задержка между отправками, количество разрывов.
- Логирование: логировать начало и конец каждого стрима, ошибки сети.
- Alerting: если доля разорванных соединений > 5% — проверить сетевую инфраструктуру.
7.3 Load balancing
- Sticky sessions: для SSE не обязательны (протокол stateless), но если сервер хранит состояние генерации, нужно привязать клиента к одному экземпляру (через cookie или IP hash).
- Graceful shutdown: при остановке сервера дождаться завершения активных стримов или отправить клиенту сообщение о завершении.
7.4 Безопасность
- Аутентификация: передавать токен в query-параметре или заголовке (для SSE через EventSource нельзя добавить кастомные заголовки — использовать cookie или поддомен).
- Rate limiting: ограничить количество одновременных стримов от одного пользователя.
8. Пример production-реализации на FastAPI + SSE
import asyncio
import time
from fastapi import FastAPI, Request, HTTPException
from sse_starlette.sse import EventSourceResponse
from typing import AsyncGenerator
app = FastAPI()
# Имитация LLM с генерацией токенов
async def generate_tokens(query: str) -> AsyncGenerator[str, None]:
# Здесь реальный вызов LLM (например, через OpenAI API с stream=True)
for token in ["Токен1", " Токен2", " Токен3"]:
yield token
await asyncio.sleep(0.05)
@app.get("/v1/chat/stream")
async def chat_stream(request: Request, query: str):
# Проверка аутентификации (упрощённо)
auth = request.headers.get("Authorization")
if not auth or not auth.startswith("Bearer "):
raise HTTPException(status_code=401)
async def event_generator():
buffer = []
last_flush = time.monotonic()
max_interval = 0.05 # 50 ms
max_size = 200 # байт
async for token in generate_tokens(query):
buffer.append(token)
now = time.monotonic()
if (now - last_flush) >= max_interval or len("".join(buffer).encode()) >= max_size:
yield {"data": "".join(buffer)}
buffer.clear()
last_flush = now
if buffer:
yield {"data": "".join(buffer)}
# Сигнал о завершении
yield {"event": "done", "data": ""}
return EventSourceResponse(event_generator())
Запуск: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Настройка Nginx (фрагмент):
upstream backend {
server 127.0.0.1:8000;
}
server {
listen 443 ssl;
location /v1/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding on;
proxy_read_timeout 300s;
limit_rate 500k;
}
}
Пет-проект для закрепления
Задача: Реализовать streaming-сервис для чат-бота с Agentic RAG, который отправляет токены ответа через SSE, с адаптивным chunk size и backpressure через WebSocket.
Инструменты: Python, FastAPI, sse-starlette, websockets, Nginx (локально), locust для нагрузочного тестирования.
Шаги:
- Создайте FastAPI-приложение с двумя эндпоинтами:
/sse(SSE) и/ws(WebSocket). - Реализуйте генератор токенов, который эмулирует LLM с переменной скоростью (0.01–0.1 с на токен).
- Для SSE: добавьте адаптивную буферизацию (изменяйте chunk size в зависимости от времени между отправками).
- Для WebSocket: реализуйте backpressure — сервер ждёт от клиента сообщение
ackперед отправкой следующего чанка. - Запустите Nginx с
limit_rateиproxy_buffering off. - Напишите клиент на JavaScript (HTML-страница), который подключается к
/sseи отображает токены в реальном времени. - Проведите нагрузочное тестирование с
locust(100 одновременных пользователей) и замерьте TTFB, количество разрывов, использование памяти.
Ожидаемый результат: Работающий сервис, где при медленном соединении (симулируйте через tc или clumsy) chunk size автоматически увеличивается, а при WebSocket — поток приостанавливается, если клиент не успевает обрабатывать.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 210 | Архитектура Agentic RAG: общая схема |
| 211 | Управление состоянием агента |
| 212 | Интеграция инструментов (function calling) |
| 213 | Обработка ошибок и retry в агентах |
| 215 | Кэширование ответов LLM |
| 216 | Мониторинг и observability Agentic RAG |
Навигация
- Предыдущий: 213
- Следующий: 215
- Индекс: 00. Индекс разборов