Реализовать streaming с SSE в FastAPI для меж-агентной коммуникации

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать streaming с SSE в FastAPI для меж-агентной коммуникации

1. Цель задачи

Научиться проектировать и реализовывать server‑sent events (SSE) эндпоинт на FastAPI для потоковой передачи данных между агентами (или между агентом и клиентом). Обеспечить, чтобы первый токен ответа приходил не позднее 200 мс (TTFT <200 ms) при стриминге текста или JSON‑чоков. Настроить измерение TTFT и корректную обработку отключений клиента.

Ключевой результат Работающий FastAPIэндпоинт /stream с SSE, который отправляет поток из N сообщений (токенов/блоков) и соблюдает целевой TTFT.


2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Python 3.10+ и pipСвежая установка или conda-окружение
FastAPI + uvicornpip install fastapi uvicorn
Клиент для тестирования SSE (curl, aiohttp, или Postman)Любой HTTP‑клиент, поддерживающий потоковые ответы
Инструмент измерения TTFTВстроенная замерка времени в клиенте или time в curl, asyncio таймеры
Базовая модель агента (можно симулировать)Простая асинхронная функция, генерирующая строки с задержкой

Если нет реального инструмента — симулируем:

  1. Написать простую async def generate_tokens(n) с await asyncio.sleep(0.01) между токенами (симуляция медленного LLM).
  2. Использовать curl -N http://localhost:8000/stream для приёма SSE.
  3. Измерить TTFT через time в bash: time curl -N http://localhost:8000/stream и зафиксировать время до первого байта.

3. Технологический стек

КомпонентИнструментыНазначение
Backend‑фреймворкFastAPI, UvicornАсинхронный HTTP‑сервер с поддержкой StreamingResponse
Streaming‑протоколServer‑Sent Events (SSE)Однонаправленный поток событий от сервера к клиенту
Асинхронностьasyncio, async generatorsЭффективная обработка многих соединений и генерация потоков
Мониторинг TTFTВстроенные таймеры asyncioЗамер времени от запроса до первого токена
Тестированиеcurl, pytest + httpxПроверка потокового вывода и производительности
Контейнеризация (опц.)Docker + docker-composeИзолированное окружение (опционально)

4. Этапы выполнения

Этап 1: Создание FastAPI‑приложения с базовым SSE (30 минут)

Действия

  1. Создать папку sse_project/, внутри app.py.
  2. Установить зависимости: fastapi, uvicorn, sse-starlette (для удобной работы с SSE).
    pip install fastapi uvicorn sse-starlette
    
  3. Написать простой async‑генератор, который выдаёт 5 сообщений с задержкой 50 мс между ними:
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import asyncio
    
    app = FastAPI()
    
    async def token_generator():
        for i in range(5):
            await asyncio.sleep(0.05)
            yield f"data: token_{i}\n\n"
    
    @app.get("/stream")
    async def stream():
        return StreamingResponse(token_generator(), media_type="text/event-stream")
    
  4. Запустить сервер:
    uvicorn app:app --host 0.0.0.0 --port 8000
    
  5. Проверить в терминале:
    curl -N http://localhost:8000/stream
    
    Должны появляться строки data: token_0 … каждые 50 мс.

Ожидаемый результат этапа Работающий SSE‑эндпоинт, возвращающий поток простых сообщений.


Этап 2: Реализация потоковой передачи структурированных данных (1 час)

Действия

  1. Определить модель сообщения агента (например, JSON с полями agent_id, content, timestamp).
  2. Модифицировать генератор так, чтобы он выдавал события в формате SSE с полями event, data, id (опционально).
  3. Использовать sse_starlette.sse.EventSourceResponse или кастомный формат.
    from sse_starlette.sse import EventSourceResponse
    
    async def structured_events():
        for chunk in ["chunk1", "chunk2", "chunk3"]:
            await asyncio.sleep(0.02)
            yield {
                "event": "message",
                "data": json.dumps({"agent": "alpha", "text": chunk})
            }
    
    @app.get("/agent-stream")
    async def agent_stream():
        return EventSourceResponse(structured_events())
    
  4. Проверить, что клиент получает события как event: message / data: {...}.

Ожидаемый результат этапа SSE‑поток с JSON‑данными, пригодными для меж‑агентной коммуникации.


Этап 3: Измерение и оптимизация TTFT < 200 ms (1 час)

Действия

  1. Добавить в endpoint замер времени первого токена:
    import time
    
    @app.get("/stream-with-metrics")
    async def stream_with_metrics():
        start_time = time.monotonic()
        async def measured_gen():
            first = True
            async for token in token_generator():
                if first:
                    first = False
                    ttft = time.monotonic() - start_time
                    print(f"TTFT: {ttft:.3f}s")
                yield token
        return StreamingResponse(measured_gen(), media_type="text/event-stream")
    
  2. Написать тестовый скрипт на Python с httpx для замера TTFT:
    import httpx
    import asyncio
    
    async def test_ttft():
        start = asyncio.get_event_loop().time()
        async with httpx.AsyncClient() as client:
            async with client.stream("GET", "http://localhost:8000/stream") as response:
                async for chunk in response.aiter_bytes():
                    ttft = asyncio.get_event_loop().time() - start
                    print(f"TTFT: {ttft*1000:.1f} ms")
                    break
    
  3. Запустить тест и убедиться, что TTFT < 200 ms. Если больше – оптимизировать:
    • Уменьшить задержки в генераторе (не должна влиять на TTFT, если first‑chunk не задерживается).
    • Убедиться, что нет лишнего импорта/инициализации перед первым yield.
    • Использовать uvicorn с несколькими workers (не влияет на TTFT, но улучшает throughput).

Ожидаемый результат этапа Зафиксирован TTFT < 200 ms, метрика выводится в лог/консоль.


Этап 4: Обработка отключения клиента и очистка ресурсов (30 минут)

Действия

  1. Модифицировать endpoint так, чтобы при разрыве соединения генератор завершался корректно:
    async def safe_generator():
        try:
            for i in range(1000):
                await asyncio.sleep(0.01)
                yield f"data: {i}\n\n"
        except asyncio.CancelledError:
            # клиент отключился
            print("Client disconnected")
    
  2. В FastAPI можно использовать request.is_disconnected():
    @app.get("/stream-safe")
    async def stream_safe(request: Request):
        async def gen():
            for i in range(100):
                if await request.is_disconnected():
                    break
                await asyncio.sleep(0.01)
                yield f"data: {i}\n\n"
        return StreamingResponse(gen(), media_type="text/event-stream")
    
  3. Проверить отключение с помощью curl с --max-time или принудительного прерывания (Ctrl+C).

Ожидаемый результат этапа Нет утечки корутин, логгирование обрыва соединения.


Этап 5: Комплексное тестирование и документирование (30 минут)

Действия

  1. Написать pytest‑тесты для всех endpoint’ов:
    • Проверка стриминга (ожидаемые события, количество).
    • Проверка TTFT (замер в тесте).
    • Проверка обработки отключения (симуляция разрыва).
  2. Создать README.md с описанием архитектуры, командой запуска, примером curl.
  3. Упаковать в Docker (опционально).

Ожидаемый результат этапа Набор автоматических тестов, документация, рабочий Docker‑образ.


5. Критерии приемки (Definition of Done)

  • FastAPI‑приложение запускается и отвечает на /stream с Content-Type: text/event-stream.
  • Клиент получает последовательность SSE‑событий без разрывов.
  • Измеренный TTFT (первый байт ответа) не превышает 200 мс при нагрузке до 10 параллельных соединений.
  • При отключении клиента сервер корректно останавливает генерацию (нет ошибок в логе).
  • Написаны минимум 3 теста (поток, TTFT, отключение).
  • Код размещён в репозитории с инструкцией запуска.
  • В репозитории присутствует файл README.md с описанием и примером.
  • Использован sse-starlette или кастомная реализация SSE с корректным форматом.

6. Ожидаемый результат

Основной артефакт Папка проекта sse-agent-stream/, содержащая:

  • app.pyFastAPI‑приложение с endpoint’ами /stream, /agent-stream, /stream-safe.
  • tests/test_stream.pypytest‑тесты.
  • requirements.txt – зависимости (fastapi, uvicorn, sse-starlette, httpx).
  • README.md – документация.

Содержимое app.py 3‑4 endpoint’а, каждый реализует SSE с разными сценариями (простые данные, JSON, обработка отключения). Встроенный замер TTFT в одном из endpoint’ов.

Опционально Dockerfile и docker-compose.yml.


7. Возможные сложности и их решение

СложностьРешение
TTFT больше 200 мс из‑за начальной задержки в генератореУбедиться, что первый yield выполняется немедленно, без asyncio.sleep или других await’ов.
Клиент не видит события (пустой ответ)Проверить media_type="text/event-stream", формат data: ...\n\n, отсутствие буферизации nginx.
Утечка памяти при долгом стриме и отключенииИспользовать try/except CancelledError или request.is_disconnected() для выхода из генератора.
SSE не работает через curl (выводится всё сразу)Флаг -N (no‑buffering) или --no-buffer. В Windows – аналогичный флаг.
Высокая нагрузка на сервер при многих параллельных стримахИспользовать uvicorn --workers 4, ограничить число одновременных соединений через middleware.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Базовый SSE30 мин
Этап 2: Структурированные данные1 час
Этап 3: Измерение и оптимизация TTFT1 час
Этап 4: Обработка отключений30 мин
Этап 5: Тестирование и документирование30 мин
Итого3.5 ч

Примечание Для первого раза с параллельным изучением SSE может потребоваться до 5 часов.


9. Связанные вопросы из базы знаний

ВопросТема
17Потоковая обработка данных в Python
84Асинхронные генераторы в FastAPI
112Server‑Sent Events против WebSockets
189Измерение времени первого токена (TTFT)
231Оптимизация latency в микросервисах
305Обработка разрыва соединения в asyncio
378FastAPI StreamingResponse с SSE
442Нагрузочное тестирование стриминговых endpoint’ов
519Очереди и буферизация в асинхронных системах
677Мониторинг метрик стриминга (TTFT, TBT)

10. Чек-лист самопроверки

  • Я создал FastAPI‑приложение и запустил его с uvicorn.
  • Я проверил, что curl -N получает события по одному.
  • Я измерил TTFT и убедился, что он < 200 мс при локальном запуске.
  • Я написал хотя бы один тест на отключение клиента.
  • Я задокументировал все endpoint’ы в README.
  • Я убедился, что нет утечки корутин при разрыве соединения.
  • Код оформлен по PEP 8, использует type hints.