中文翻译暂不可用,显示俄语原文。
Настроить backpressure в ingestion
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить backpressure в ingestion
1. Цель задачи
Научиться проектировать и внедрять механизм backpressure в сервисе приёма данных (ingestion) для защиты нижележащих систем от перегрузки. Конкретная реализация — bounded очередь фиксированного размера с возвратом HTTP 429 при её переполнении. Задача позволяет отработать паттерн «ограниченный буфер + управление нагрузкой» — обязательный элемент production-систем.
Ключевой результат Сервис ingestion, который:
- использует bounded очередь (конфигурируемый размер),
- при переполнении очереди возвращает HTTP 429 (Too Many Requests) вместо падения или бесконечного ожидания,
- поддерживает базовую observability (логирование/метрики).
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Базовый HTTP-сервис на Python (FastAPI / aiohttp) | Написать с нуля или взять существующий pet-проект |
| Код потребителя (consumer) — функция, которая обрабатывает элементы очереди | Написать заглушку (например, имитация дорогой обработки через asyncio.sleep(0.1)) |
| Инструмент для нагрузочного тестирования | locust, wrk, ab или httpx с параллельными вызовами |
| Python 3.10+ + менеджер зависимостей | pip / poetry |
| Git | Для фиксации изменений |
Если нет реального потребителя — симулируем:
- Создать асинхронную функцию async def process_item(item: dict) -> None:.
- Внутри — await asyncio.sleep(random.uniform(0.05, 0.2)) (имитация разной загрузки).
- Функция будет вызываться фоновой корутиной, читающей очередь.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| HTTP-фреймворк | FastAPI (или aiohttp) | Приём POST-запросов с данными |
| Очередь bounded | asyncio.Queue(maxsize=N) | Буфер фиксированного размера (in-memory) |
| Потребитель | asyncio задача (coroutine) | Асинхронное чтение и обработка элементов |
| Управление HTTP-кодами | FastAPI / Starlette | Возврат 429 при queue.full() или put_nowait() |
| Тестирование | pytest + pytest-asyncio | Юнит-тесты + тест переполнения |
| Нагрузочное тестирование | locust (или httpx + asyncio.gather) | Проверка поведения под нагрузкой |
| Логирование | structlog / logging | Фиксация отказов и пропускной способности |
| Метрики (опционально) | prometheus_client | Счётчики 200/429, размер очереди |
4. Этапы выполнения
Этап 1: Проектирование и скелет сервиса (30 минут)
Действия
- Создать новую директорию и инициализировать проект:
mkdir backpressure-ingestion && cd backpressure-ingestion python -m venv venv source venv/bin/activate pip install fastapi uvicorn pydantic httpx - Написать минимальный FastAPI-сервис с одним эндпоинтом
POST /ingest. - Определить Pydantic-модель для входящих данных, например:
class IngestRequest(BaseModel): id: str payload: dict - Создать bounded очередь как глобальную переменную: queue = asyncio.Queue(maxsize=100).
- Написать consumer-функцию (заглушку обработки).
Ожидаемый результат этапа Проект с пустой очередью, сервис стартует, но данные пока кладутся напрямую (без backpressure). Локальный запуск: uvicorn main:app --reload.
Этап 2: Реализация backpressure через bounded очередь (1 час)
Действия
- В эндпоинте
POST /ingestзаменить прямую обработку на попытку положить элемент в очередь: - Запустить фоновую задачу при старте приложения:
@app.on_event("startup") async def startup(): asyncio.create_task(consumer_loop()) - В
consumer_loopв бесконечном цикле читать queue.get() и вызыватьprocess_item(...). - Добавить обработку исключений в consumer: если
process_itemупал — логировать, но не прерывать цикл. - Настроить логирование: при успешном добавлении в очередь (лог debug), при возврате 429 (лог warning).
Ожидаемый результат этапа
- Сервис возвращает 200 при свободной очереди.
- Сервис возвращает 429, когда очередь заполнена (можно проверить ручным вызовом).
- Consumer обрабатывает элементы в фоне.
Этап 3: Тестирование backpressure (1 час)
Действия
- Написать юнит-тест (используя pytest-asyncio) для проверки:
- что после заполнения очереди возвращается 429,
- что после обработки элементов снова можно добавлять.
- Написать нагрузочный тест:
- Дополнительно: проверить, что потребитель в среднем обрабатывает элементы — размер очереди стабилизируется.
Пример юнит-теста
@pytest.mark.asyncio
async def test_queue_full_returns_429():
# Заполняем очередь
for i in range(QUEUE_MAXSIZE):
await app.queue.put({"id": str(i)})
# Следующий запрос должен вернуть 429
async with AsyncClient(app=app, base_url="http://test") as ac:
resp = await ac.post("/ingest", json={"id": "overflow", "payload": {}})
assert resp.status_code == 429
Ожидаемый результат этапа Все тесты проходят, под нагрузкой сервис стабилен, метрики 200/429 корректны.
Этап 4: Observability и конфигурируемость (45 минут)
Действия
- Вынести
maxsizeочереди в переменную окруженияINGESTION_QUEUE_SIZE(со значением по умолчанию, например 100). - Добавить endpoint
GET /metrics(если используете Prometheus) или просто выводить в логи статистику (queue.qsize()) каждые 10 секунд. - Добавить структурированное логирование через structlog или logging с полями:
- event: "item_enqueued" / "queue_full" / "item_processed"
queue_size: текущий размер- request_id: если есть
- Настроить обработку исключений на уровне middleware: все необработанные ошибки возвращать 500 и логировать.
Ожидаемый результат этапа Сервис полностью готов к production-запуску: конфигурируется через env, логирует ключевые события, имеет метрики для мониторинга.
Этап 5: Документация и демонстрация (30 минут)
Действия
- Написать README:
- описание задачи,
- инструкция по запуску,
- как менять размер очереди,
- примеры запросов (curl).
- Записать короткий сценарий демонстрации (или скринкаст):
- запуск сервиса,
- отправка нагрузки (например,
for i in {1..200}; do curl -X POST ... & done), - показать, что часть запросов получает 429, сервис жив.
- Закоммитить код в git, создать метку (tag)
v1.0-backpressure.
Ожидаемый результат этапа Репозиторий с кодом, README и демонстрацией работы механизма backpressure.
5. Критерии приемки (Definition of Done)
- Сервис запускается и принимает POST-запросы.
- Очередь имеет ограниченный размер, задаваемый через переменную окружения.
- При переполнении очереди эндпоинт возвращает HTTP 429 (с JSON-телом).
- Фоновый consumer обрабатывает элементы без потерь (ни один элемент не теряется).
- При падении consumer в обработке элемента он не блокирует очередь (логирует ошибку и продолжает).
- Под нагрузкой (в 2-3x превышающей пропускную способность consumer) сервис не падает и не перестаёт отвечать (всегда возвращает 200 или 429, но не 500).
- Написаны юнит-тесты на сценарий переполнения и нормальной работы.
- В README описан запуск и примеры вызовов.
6. Ожидаемый результат
Основной артефакт репозиторий с кодом Python-сервиса (FastAPI) с реализованным backpressure.
Содержание репозитория
main.py— сам сервис (эндпоинт, очередь, consumer).requirements.txt— зависимости.tests/test_backpressure.py— тесты.README.md— документация.- (Опционально)
docker-compose.ymlдля полной демонстрации.
Дополнительные результаты
- Понимание асинхронной очереди Python (
asyncio.Queue). - Навык тестирования асинхронных сервисов под нагрузкой.
- Умение настраивать логирование и конфигурацию через env.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
Consumer работает медленнее, чем поступают запросы, очередь растёт, но 429 не возвращается т.к. put_nowait не бросил исключение (размер не достигнут). | Проверить, что очередь действительно конечна: уменьшить maxsize до 5 для тестов. |
При отключении consumer (например, исключение внутри process_item) очередь забивается и сервис начинает возвращать 429, хотя мог бы обрабатывать. | Добавить try/except в consumer и перезапускать задачу с задержкой. |
Тест с нагрузкой через asyncio.gather может сам упасть из-за большого количества параллельных корутин. | Использовать asyncio.Semaphore для ограничения параллелизма клиента. |
| FastAPI при старте не ждёт создания consumer, и первый запрос может прийти до того, как consumer запущен. | Убедиться, что consumer создаётся в startup до первого запроса (FastAPI гарантирует это). |
Переменную окружения INGESTION_QUEUE_SIZE нужно парсить как int с fallback. | Использовать int(os.getenv("INGESTION_QUEUE_SIZE", "100")). |
8. Бюджет времени (оценка)
| Этап | Время (часы) |
|---|---|
| 1. Проектирование и скелет | 0.5 |
| 2. Реализация backpressure | 1.0 |
| 3. Тестирование | 1.0 |
| 4. Observability и конфигурация | 0.75 |
| 5. Документация и демонстрация | 0.5 |
| Итого | 3.75 часов |
Примечание: для первого раза может потребоваться до 5 часов с учётом отладки и изучения документации.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 127 | Как работают асинхронные очереди (Python) |
| 233 | Что такое backpressure и зачем он нужен |
| 498 | HTTP 429 — Too Many Requests (RFC 6585) |
| 612 | Rate limiting vs backpressure |
| 789 | Graceful degradation в микросервисах |
| 341 | Конфигурирование через переменные окружения |
| 567 | Нагрузочное тестирование асинхронных сервисов |
| 820 | Логирование структурированными сообщениями |
10. Чек-лист самопроверки
- Я создал отдельный репозиторий / ветку для этой задачи.
- Я протестировал, что при отправке большего числа запросов, чем может обработать consumer, возвращается код 429, а не ошибка сервера.
- Я убедился, что очередь не теряет элементы — после снижения нагрузки все элементы обрабатываются.
- Я вынес размер очереди в переменную окружения и обновил README.
- Я добавил юнит-тест, который проверяет сценарий переполнения очереди.