Настроить backpressure в ingestion

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить backpressure в ingestion

1. Цель задачи

Научиться проектировать и внедрять механизм backpressure в сервисе приёма данных (ingestion) для защиты нижележащих систем от перегрузки. Конкретная реализация — bounded очередь фиксированного размера с возвратом HTTP 429 при её переполнении. Задача позволяет отработать паттерн «ограниченный буфер + управление нагрузкой» — обязательный элемент production-систем.

Ключевой результат Сервис ingestion, который:

  • использует bounded очередь (конфигурируемый размер),
  • при переполнении очереди возвращает HTTP 429 (Too Many Requests) вместо падения или бесконечного ожидания,
  • поддерживает базовую observability (логирование/метрики).

2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Базовый HTTP-сервис на Python (FastAPI / aiohttp)Написать с нуля или взять существующий pet-проект
Код потребителя (consumer) — функция, которая обрабатывает элементы очередиНаписать заглушку (например, имитация дорогой обработки через asyncio.sleep(0.1))
Инструмент для нагрузочного тестированияlocust, wrk, ab или httpx с параллельными вызовами
Python 3.10+ + менеджер зависимостейpip / poetry
GitДля фиксации изменений

Если нет реального потребителя — симулируем:

  1. Создать асинхронную функцию async def process_item(item: dict) -> None:.
  2. Внутри — await asyncio.sleep(random.uniform(0.05, 0.2)) (имитация разной загрузки).
  3. Функция будет вызываться фоновой корутиной, читающей очередь.

3. Технологический стек

КомпонентИнструментыНазначение
HTTP-фреймворкFastAPI (или aiohttp)Приём POST-запросов с данными
Очередь boundedasyncio.Queue(maxsize=N)Буфер фиксированного размера (in-memory)
Потребительasyncio задача (coroutine)Асинхронное чтение и обработка элементов
Управление HTTP-кодамиFastAPI / StarletteВозврат 429 при queue.full() или put_nowait()
Тестированиеpytest + pytest-asyncioЮнит-тесты + тест переполнения
Нагрузочное тестированиеlocust (или httpx + asyncio.gather)Проверка поведения под нагрузкой
Логированиеstructlog / loggingФиксация отказов и пропускной способности
Метрики (опционально)prometheus_clientСчётчики 200/429, размер очереди

4. Этапы выполнения

Этап 1: Проектирование и скелет сервиса (30 минут)

Действия

  1. Создать новую директорию и инициализировать проект:
    mkdir backpressure-ingestion && cd backpressure-ingestion
    python -m venv venv
    source venv/bin/activate
    pip install fastapi uvicorn pydantic httpx
    
  2. Написать минимальный FastAPI-сервис с одним эндпоинтом POST /ingest.
  3. Определить Pydantic-модель для входящих данных, например:
    class IngestRequest(BaseModel):
        id: str
        payload: dict
    
  4. Создать bounded очередь как глобальную переменную: queue = asyncio.Queue(maxsize=100).
  5. Написать consumer-функцию (заглушку обработки).

Ожидаемый результат этапа Проект с пустой очередью, сервис стартует, но данные пока кладутся напрямую (без backpressure). Локальный запуск: uvicorn main:app --reload.


Этап 2: Реализация backpressure через bounded очередь (1 час)

Действия

  1. В эндпоинте POST /ingest заменить прямую обработку на попытку положить элемент в очередь:
    • Использовать queue.put_nowait(item) — неблокирующий вариант.
    • При asyncio.QueueFull возвращать JSONResponse(status_code=429, content={"error": "Too Many Requests"}).
  2. Запустить фоновую задачу при старте приложения:
    @app.on_event("startup")
    async def startup():
        asyncio.create_task(consumer_loop())
    
  3. В consumer_loop в бесконечном цикле читать queue.get() и вызывать process_item(...).
  4. Добавить обработку исключений в consumer: если process_item упал — логировать, но не прерывать цикл.
  5. Настроить логирование: при успешном добавлении в очередь (лог debug), при возврате 429 (лог warning).

Ожидаемый результат этапа

  • Сервис возвращает 200 при свободной очереди.
  • Сервис возвращает 429, когда очередь заполнена (можно проверить ручным вызовом).
  • Consumer обрабатывает элементы в фоне.

Этап 3: Тестирование backpressure (1 час)

Действия

  1. Написать юнит-тест (используя pytest-asyncio) для проверки:
    • что после заполнения очереди возвращается 429,
    • что после обработки элементов снова можно добавлять.
  2. Написать нагрузочный тест:
    • Используя httpx.AsyncClient (или locust), послать 500 запросов без ожидания ответа.
    • Подсчитать количество 200 vs 429.
    • Убедиться, что сервис не упал (нет 500 ошибок).
  3. Дополнительно: проверить, что потребитель в среднем обрабатывает элементы — размер очереди стабилизируется.

Пример юнит-теста

@pytest.mark.asyncio
async def test_queue_full_returns_429():
    # Заполняем очередь
    for i in range(QUEUE_MAXSIZE):
        await app.queue.put({"id": str(i)})
    # Следующий запрос должен вернуть 429
    async with AsyncClient(app=app, base_url="http://test") as ac:
        resp = await ac.post("/ingest", json={"id": "overflow", "payload": {}})
    assert resp.status_code == 429

Ожидаемый результат этапа Все тесты проходят, под нагрузкой сервис стабилен, метрики 200/429 корректны.


Этап 4: Observability и конфигурируемость (45 минут)

Действия

  1. Вынести maxsize очереди в переменную окружения INGESTION_QUEUE_SIZE (со значением по умолчанию, например 100).
  2. Добавить endpoint GET /metrics (если используете Prometheus) или просто выводить в логи статистику (queue.qsize()) каждые 10 секунд.
  3. Добавить структурированное логирование через structlog или logging с полями:
    • event: "item_enqueued" / "queue_full" / "item_processed"
    • queue_size: текущий размер
    • request_id: если есть
  4. Настроить обработку исключений на уровне middleware: все необработанные ошибки возвращать 500 и логировать.

Ожидаемый результат этапа Сервис полностью готов к production-запуску: конфигурируется через env, логирует ключевые события, имеет метрики для мониторинга.


Этап 5: Документация и демонстрация (30 минут)

Действия

  1. Написать README:
    • описание задачи,
    • инструкция по запуску,
    • как менять размер очереди,
    • примеры запросов (curl).
  2. Записать короткий сценарий демонстрации (или скринкаст):
    • запуск сервиса,
    • отправка нагрузки (например, for i in {1..200}; do curl -X POST ... & done),
    • показать, что часть запросов получает 429, сервис жив.
  3. Закоммитить код в git, создать метку (tag) v1.0-backpressure.

Ожидаемый результат этапа Репозиторий с кодом, README и демонстрацией работы механизма backpressure.


5. Критерии приемки (Definition of Done)

  • Сервис запускается и принимает POST-запросы.
  • Очередь имеет ограниченный размер, задаваемый через переменную окружения.
  • При переполнении очереди эндпоинт возвращает HTTP 429 (с JSON-телом).
  • Фоновый consumer обрабатывает элементы без потерь (ни один элемент не теряется).
  • При падении consumer в обработке элемента он не блокирует очередь (логирует ошибку и продолжает).
  • Под нагрузкой (в 2-3x превышающей пропускную способность consumer) сервис не падает и не перестаёт отвечать (всегда возвращает 200 или 429, но не 500).
  • Написаны юнит-тесты на сценарий переполнения и нормальной работы.
  • В README описан запуск и примеры вызовов.

6. Ожидаемый результат

Основной артефакт репозиторий с кодом Python-сервиса (FastAPI) с реализованным backpressure.

Содержание репозитория

  • main.py — сам сервис (эндпоинт, очередь, consumer).
  • requirements.txt — зависимости.
  • tests/test_backpressure.py — тесты.
  • README.md — документация.
  • (Опционально) docker-compose.yml для полной демонстрации.

Дополнительные результаты

  • Понимание асинхронной очереди Python (asyncio.Queue).
  • Навык тестирования асинхронных сервисов под нагрузкой.
  • Умение настраивать логирование и конфигурацию через env.

7. Возможные сложности и их решение

СложностьРешение
Consumer работает медленнее, чем поступают запросы, очередь растёт, но 429 не возвращается т.к. put_nowait не бросил исключение (размер не достигнут).Проверить, что очередь действительно конечна: уменьшить maxsize до 5 для тестов.
При отключении consumer (например, исключение внутри process_item) очередь забивается и сервис начинает возвращать 429, хотя мог бы обрабатывать.Добавить try/except в consumer и перезапускать задачу с задержкой.
Тест с нагрузкой через asyncio.gather может сам упасть из-за большого количества параллельных корутин.Использовать asyncio.Semaphore для ограничения параллелизма клиента.
FastAPI при старте не ждёт создания consumer, и первый запрос может прийти до того, как consumer запущен.Убедиться, что consumer создаётся в startup до первого запроса (FastAPI гарантирует это).
Переменную окружения INGESTION_QUEUE_SIZE нужно парсить как int с fallback.Использовать int(os.getenv("INGESTION_QUEUE_SIZE", "100")).

8. Бюджет времени (оценка)

ЭтапВремя (часы)
1. Проектирование и скелет0.5
2. Реализация backpressure1.0
3. Тестирование1.0
4. Observability и конфигурация0.75
5. Документация и демонстрация0.5
Итого3.75 часов

Примечание: для первого раза может потребоваться до 5 часов с учётом отладки и изучения документации.


9. Связанные вопросы из базы знаний

ВопросТема
127Как работают асинхронные очереди (Python)
233Что такое backpressure и зачем он нужен
498HTTP 429 — Too Many Requests (RFC 6585)
612Rate limiting vs backpressure
789Graceful degradation в микросервисах
341Конфигурирование через переменные окружения
567Нагрузочное тестирование асинхронных сервисов
820Логирование структурированными сообщениями

10. Чек-лист самопроверки

  • Я создал отдельный репозиторий / ветку для этой задачи.
  • Я протестировал, что при отправке большего числа запросов, чем может обработать consumer, возвращается код 429, а не ошибка сервера.
  • Я убедился, что очередь не теряет элементы — после снижения нагрузки все элементы обрабатываются.
  • Я вынес размер очереди в переменную окружения и обновил README.
  • Я добавил юнит-тест, который проверяет сценарий переполнения очереди.