Реализовать streaming с SSE в FastAPI для меж-агентной коммуникации
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать streaming с SSE в FastAPI для меж-агентной коммуникации
1. Цель задачи
Научиться проектировать и реализовывать server‑sent events (SSE) эндпоинт на FastAPI для потоковой передачи данных между агентами (или между агентом и клиентом). Обеспечить, чтобы первый токен ответа приходил не позднее 200 мс (TTFT <200 ms) при стриминге текста или JSON‑чоков. Настроить измерение TTFT и корректную обработку отключений клиента.
Ключевой результат Работающий FastAPI‑эндпоинт /stream с SSE, который отправляет поток из N сообщений (токенов/блоков) и соблюдает целевой TTFT.
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Python 3.10+ и pip | Свежая установка или conda-окружение |
| FastAPI + uvicorn | pip install fastapi uvicorn |
| Клиент для тестирования SSE (curl, aiohttp, или Postman) | Любой HTTP‑клиент, поддерживающий потоковые ответы |
| Инструмент измерения TTFT | Встроенная замерка времени в клиенте или time в curl, asyncio таймеры |
| Базовая модель агента (можно симулировать) | Простая асинхронная функция, генерирующая строки с задержкой |
Если нет реального инструмента — симулируем:
- Написать простую async def generate_tokens(n) с await asyncio.sleep(0.01) между токенами (симуляция медленного LLM).
- Использовать curl -N http://localhost:8000/stream для приёма SSE.
- Измерить TTFT через time в bash: time curl -N http://localhost:8000/stream и зафиксировать время до первого байта.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Backend‑фреймворк | FastAPI, Uvicorn | Асинхронный HTTP‑сервер с поддержкой StreamingResponse |
| Streaming‑протокол | Server‑Sent Events (SSE) | Однонаправленный поток событий от сервера к клиенту |
| Асинхронность | asyncio, async generators | Эффективная обработка многих соединений и генерация потоков |
| Мониторинг TTFT | Встроенные таймеры asyncio | Замер времени от запроса до первого токена |
| Тестирование | curl, pytest + httpx | Проверка потокового вывода и производительности |
| Контейнеризация (опц.) | Docker + docker-compose | Изолированное окружение (опционально) |
4. Этапы выполнения
Этап 1: Создание FastAPI‑приложения с базовым SSE (30 минут)
Действия
- Создать папку
sse_project/, внутриapp.py. - Установить зависимости: fastapi, uvicorn, sse-starlette (для удобной работы с SSE).
pip install fastapi uvicorn sse-starlette - Написать простой async‑генератор, который выдаёт 5 сообщений с задержкой 50 мс между ними:
from fastapi import FastAPI from fastapi.responses import StreamingResponse import asyncio app = FastAPI() async def token_generator(): for i in range(5): await asyncio.sleep(0.05) yield f"data: token_{i}\n\n" @app.get("/stream") async def stream(): return StreamingResponse(token_generator(), media_type="text/event-stream") - Запустить сервер:
uvicorn app:app --host 0.0.0.0 --port 8000 - Проверить в терминале:
Должны появляться строкиcurl -N http://localhost:8000/streamdata: token_0… каждые 50 мс.
Ожидаемый результат этапа Работающий SSE‑эндпоинт, возвращающий поток простых сообщений.
Этап 2: Реализация потоковой передачи структурированных данных (1 час)
Действия
- Определить модель сообщения агента (например, JSON с полями
agent_id,content, timestamp). - Модифицировать генератор так, чтобы он выдавал события в формате SSE с полями event,
data,id(опционально). - Использовать sse_starlette.sse.EventSourceResponse или кастомный формат.
from sse_starlette.sse import EventSourceResponse async def structured_events(): for chunk in ["chunk1", "chunk2", "chunk3"]: await asyncio.sleep(0.02) yield { "event": "message", "data": json.dumps({"agent": "alpha", "text": chunk}) } @app.get("/agent-stream") async def agent_stream(): return EventSourceResponse(structured_events()) - Проверить, что клиент получает события как event: message /
data: {...}.
Ожидаемый результат этапа SSE‑поток с JSON‑данными, пригодными для меж‑агентной коммуникации.
Этап 3: Измерение и оптимизация TTFT < 200 ms (1 час)
Действия
- Добавить в endpoint замер времени первого токена:
import time @app.get("/stream-with-metrics") async def stream_with_metrics(): start_time = time.monotonic() async def measured_gen(): first = True async for token in token_generator(): if first: first = False ttft = time.monotonic() - start_time print(f"TTFT: {ttft:.3f}s") yield token return StreamingResponse(measured_gen(), media_type="text/event-stream") - Написать тестовый скрипт на Python с httpx для замера TTFT:
import httpx import asyncio async def test_ttft(): start = asyncio.get_event_loop().time() async with httpx.AsyncClient() as client: async with client.stream("GET", "http://localhost:8000/stream") as response: async for chunk in response.aiter_bytes(): ttft = asyncio.get_event_loop().time() - start print(f"TTFT: {ttft*1000:.1f} ms") break - Запустить тест и убедиться, что TTFT < 200 ms. Если больше – оптимизировать:
- Уменьшить задержки в генераторе (не должна влиять на TTFT, если first‑chunk не задерживается).
- Убедиться, что нет лишнего импорта/инициализации перед первым
yield. - Использовать uvicorn с несколькими workers (не влияет на TTFT, но улучшает throughput).
Ожидаемый результат этапа Зафиксирован TTFT < 200 ms, метрика выводится в лог/консоль.
Этап 4: Обработка отключения клиента и очистка ресурсов (30 минут)
Действия
- Модифицировать endpoint так, чтобы при разрыве соединения генератор завершался корректно:
async def safe_generator(): try: for i in range(1000): await asyncio.sleep(0.01) yield f"data: {i}\n\n" except asyncio.CancelledError: # клиент отключился print("Client disconnected") - В FastAPI можно использовать
request.is_disconnected():@app.get("/stream-safe") async def stream_safe(request: Request): async def gen(): for i in range(100): if await request.is_disconnected(): break await asyncio.sleep(0.01) yield f"data: {i}\n\n" return StreamingResponse(gen(), media_type="text/event-stream") - Проверить отключение с помощью curl с
--max-timeили принудительного прерывания (Ctrl+C).
Ожидаемый результат этапа Нет утечки корутин, логгирование обрыва соединения.
Этап 5: Комплексное тестирование и документирование (30 минут)
Действия
- Написать pytest‑тесты для всех endpoint’ов:
- Проверка стриминга (ожидаемые события, количество).
- Проверка TTFT (замер в тесте).
- Проверка обработки отключения (симуляция разрыва).
- Создать README.md с описанием архитектуры, командой запуска, примером curl.
- Упаковать в Docker (опционально).
Ожидаемый результат этапа Набор автоматических тестов, документация, рабочий Docker‑образ.
5. Критерии приемки (Definition of Done)
- FastAPI‑приложение запускается и отвечает на
/streamс Content-Type: text/event-stream. - Клиент получает последовательность SSE‑событий без разрывов.
- Измеренный TTFT (первый байт ответа) не превышает 200 мс при нагрузке до 10 параллельных соединений.
- При отключении клиента сервер корректно останавливает генерацию (нет ошибок в логе).
- Написаны минимум 3 теста (поток, TTFT, отключение).
- Код размещён в репозитории с инструкцией запуска.
- В репозитории присутствует файл
README.mdс описанием и примером. - Использован
sse-starletteили кастомная реализация SSE с корректным форматом.
6. Ожидаемый результат
Основной артефакт Папка проекта sse-agent-stream/, содержащая:
app.py– FastAPI‑приложение с endpoint’ами/stream,/agent-stream,/stream-safe.tests/test_stream.py– pytest‑тесты.requirements.txt– зависимости (fastapi, uvicorn, sse-starlette, httpx).README.md– документация.
Содержимое app.py 3‑4 endpoint’а, каждый реализует SSE с разными сценариями (простые данные, JSON, обработка отключения). Встроенный замер TTFT в одном из endpoint’ов.
Опционально Dockerfile и docker-compose.yml.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| TTFT больше 200 мс из‑за начальной задержки в генераторе | Убедиться, что первый yield выполняется немедленно, без asyncio.sleep или других await’ов. |
| Клиент не видит события (пустой ответ) | Проверить media_type="text/event-stream", формат data: ...\n\n, отсутствие буферизации nginx. |
| Утечка памяти при долгом стриме и отключении | Использовать try/except CancelledError или request.is_disconnected() для выхода из генератора. |
| SSE не работает через curl (выводится всё сразу) | Флаг -N (no‑buffering) или --no-buffer. В Windows – аналогичный флаг. |
| Высокая нагрузка на сервер при многих параллельных стримах | Использовать uvicorn --workers 4, ограничить число одновременных соединений через middleware. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Базовый SSE | 30 мин |
| Этап 2: Структурированные данные | 1 час |
| Этап 3: Измерение и оптимизация TTFT | 1 час |
| Этап 4: Обработка отключений | 30 мин |
| Этап 5: Тестирование и документирование | 30 мин |
| Итого | 3.5 ч |
Примечание Для первого раза с параллельным изучением SSE может потребоваться до 5 часов.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 17 | Потоковая обработка данных в Python |
| 84 | Асинхронные генераторы в FastAPI |
| 112 | Server‑Sent Events против WebSockets |
| 189 | Измерение времени первого токена (TTFT) |
| 231 | Оптимизация latency в микросервисах |
| 305 | Обработка разрыва соединения в asyncio |
| 378 | FastAPI StreamingResponse с SSE |
| 442 | Нагрузочное тестирование стриминговых endpoint’ов |
| 519 | Очереди и буферизация в асинхронных системах |
| 677 | Мониторинг метрик стриминга (TTFT, TBT) |
10. Чек-лист самопроверки
- Я создал FastAPI‑приложение и запустил его с uvicorn.
- Я проверил, что
curl -Nполучает события по одному. - Я измерил TTFT и убедился, что он < 200 мс при локальном запуске.
- Я написал хотя бы один тест на отключение клиента.
- Я задокументировал все endpoint’ы в README.
- Я убедился, что нет утечки корутин при разрыве соединения.
- Код оформлен по PEP 8, использует type hints.