Настроить load shedding при перегрузке в multi-tenant агентной системе

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить load shedding при перегрузке в multi-tenant агентной системе

1. Цель задачи

Реализовать механизм load shedding (отбрасывание части запросов при перегрузке) для агентной системы, обрабатывающей запросы от разных классов пользователей (premium, regular, batch-pipeline). При превышении порога загрузки система должна приоритизировать запросы premium-клиентов, частично отклонять regular-запросы и полностью сбрасывать batch-поток, возвращая HTTP-код 503 Service Unavailable с заголовком Retry-After. Механизм должен быть прозрачен для клиентов, тестируем и интегрируем в существующую архитектуру.

Ключевой результат Работающая система load shedding, которая во время стресс-теста гарантирует, что премиум-запросы получают ответ (не более 1% отказов), regular-запросы — не более 30% отказов (при равномерном смешивании), а batch-поток полностью блокируется при превышении порога.


2. Исходные данные

Что нужноОткуда взять
Архитектура текущей агентной системыОписание из проекта (или гипотетическая схема с FastAPI + asyncio)
Требования к SLA для каждого класса запросовЗадание (premium: <100ms p99; regular: <500ms p99; batch: no SLA)
Данные о типичной нагрузке (RPS, latency)Логи или прометеус метрики (если нет — симулируем)
Инструмент для нагрузочного тестированияk6, Locust или wrk2
Репозиторий с кодом агентной системыGit-репозиторий (возможно, пустой для pet-проекта)
Модель очередей (in-memory или Redis)Решение команды (в задаче используем in-memory asyncio.Queue)

Если нет реального инструмента — симулируем:

  1. Создаём standalone-сервис на FastAPI с эндпоинтом /agent (принимает запросы с телом {"priority": "premium"|"regular"|"batch", "task_id": "..."}).
  2. Встраиваем middleware LoadSheddingMiddleware, который считывает приоритет из JSON-тела.
  3. Используем in-memory семафор/счётчик для отслеживания текущей загрузки (количество активных обработок).
  4. Порог загрузки (max_concurrent = 10) и Retry-After в секундах задаём через переменные окружения.

3. Технологический стек

КомпонентИнструментыНазначение
Язык и Web-фреймворкPython 3.12, FastAPI, uvicornРеализация сервиса
Асинхронные очередиasyncio.QueueПриоритетная очередь (опционально)
Мониторинг (опционально)Prometheus client (prometheus_client)Метрики load shedding (количество отклонённых, загрузка)
Нагрузочное тестированиеk6 / LocustВалидация поведения при перегрузке
Логированиеstructlog / loggingАудит решений о сбросе
КонтейнеризацияDocker (docker-compose)Изолированное развёртывание
CIGitHub Actions (опционально)Автоматизация нагрузочного теста

4. Этапы выполнения

Этап 1: Анализ требований и проектирование (30-45 минут)

Действия

  1. Определить классы запросов и их приоритеты (premium > regular > batch).
  2. Сформулировать политику load shedding:
    • Если число активных запросов < max_concurrent → все запросы принимаются.
    • Если ≥ max_concurrent → запрос с наименьшим приоритетом отклоняется (возвращается 503).
    • Если активные запросы превышают max_concurrent + buffer → даже premium может быть отклонён (но buffer выставляем так, чтобы этого не произошло при разумной нагрузке).
  3. Определить метрики для контроля: active_requests, rejected_total, rejected_by_priority.
  4. Выбрать место вставки – в middleware (лучше всего, до handler’а) или в транспортном слое.

Ожидаемый результат этапа Документ с описанием политики, таблица приоритетов, черновая схема кода.

Этап 2: Реализация middleware и конфигурации (1-1.5 часа)

Действия

  1. Создать класс LoadShedder
    • использует asyncio.Semaphore с начальным значением CONCURRENCY_LIMIT.
    • метод try_acquire(priority) — пытается уменьшить семафор; если семафор занят, проверяет приоритет текущего запроса и сравнивает с приоритетами выполняющихся (через очередь ожидания).
    • Для простоты начнём с абсолютной политики: при достижении лимита все новые запросы отклоняются, но premium проходят с дополнительным buffer'ом.
    • Реализация через asyncio.Queue приоритетов для обработки (необязательно, можно и просто счётчик).
  2. Реализовать middleware FastAPI
    • Извлекает приоритет из тела запроса (или заголовка).
    • Проверяет, может ли запрос быть принят.
    • Если да, устанавливает контекст и передаёт обработчику.
    • Если нет, возвращает JSON-ответ с status_code=503 и заголовком Retry-After: <секунды>.
  3. Добавить переменные окружения
    • CONCURRENCY_LIMIT (int) — максимальное число одновременно обрабатываемых запросов.
    • RETRY_AFTER_SECONDS (int) — значение заголовка Retry-After.
    • PRIORITY_WEIGHTS (str) — для возможной взвешенной политики (но на первом этапе используем простую).
  4. Подготовить обработчик /agent (заглушка):
    • Имитация работы: await asyncio.sleep(random.uniform(0.1, 0.5)).

Ожидаемый результат этапа Рабочий сервис, запускаемый локально, принимающий запросы с приоритетом и применяющий load shedding.

Этап 3: Модульное и интеграционное тестирование (1 час)

Действия

  1. Написать unit-тесты для LoadShedder
    • Проверить, что при загрузке ниже лимита все запросы проходят.
    • Что при превышении лимита batch-запросы отклоняются, premium проходят.
    • Что regular-запросы обрабатываются в соответствии с политикой (например, 50% отклоняются при лимите 10 и 15 запросах).
    • Что заголовок Retry-After присутствует.
  2. Написать интеграционный тест с помощью httpx.AsyncClient:
    • Отправка 50 параллельных запросов (asyncio.gather) с разными приоритетами.
    • Проверка, что количество premium-ответов с 503 ≤ 5% (например, 0), regular ≤ 30%, batch ≥ 80%.
  3. Проверить корректность возврата 503 – использовать response.status_code == 503 и 'Retry-After' in response.headers.

Ожидаемый результат этапа Набор тестов (pytest), проходящих на CI.

Этап 4: Нагрузочное тестирование с k6/Locust (1-1.5 часа)

Действия

  1. Установить и настроить k6 (или Locust).
  2. Написать скрипт нагрузочного теста
    • Генерация запросов с разным приоритетом (например, 20% premium, 60% regular, 20% batch).
    • Постепенное увеличение RPS от 10 до 100 (если лимит concurrency=10).
    • Замерять latency и процент ошибок по классам.
  3. Запустить тест и собрать данные
    • Вывести метрики: http_req_duration для успешных, http_req_failed для ошибок.
    • Разделить по приоритету, используя таги (tags) в k6.
  4. Зафиксировать результаты
    • Таблица: при каком RPS началось отбрасывание, сколько запросов отправлено, сколько получили 503.

Ожидаемый результат этапа Отчёт о нагрузочном тестировании (графики, цифры), подтверждающий выполнение ключевого результата.

Этап 5: Мониторинг и документация (30 минут)

Действия

  1. Добавить Prometheus-метрики:
    • load_shedder_active_requests (Gauge).
    • load_shedder_rejected_total (Counter) с лейблом priority.
  2. Экспонировать метрики через /metrics (FastAPI + prometheus_client).
  3. Написать README
    • Описание политики load shedding.
    • Как конфигурировать (переменные окружения).
    • Пример запуска и нагрузочного теста.
  4. Если есть Grafana – добавить скриншот дашборда (опционально).

Ожидаемый результат этапа Документированное решение, код в Git, README.


5. Критерии приемки (Definition of Done)

  • Load shedding встроен как middleware FastAPI без изменения существующих обработчиков.
  • При превышении лимита (CONCURRENCY_LIMIT) запросы batch-потока получают HTTP 503 с заголовком Retry-After.
  • Premium-запросы имеют приоритет: при перегрузке они обрабатываются, а regular/batch отклоняются (для упрощения premium всегда обрабатывается, пока лимит не превышен более чем на buffer).
  • Код покрыт unit-тестами (минимум 5 тест-кейсов).
  • Проведён нагрузочный тест (k6/Locust), результаты задокументированы.
  • Метрики (active_requests, rejected_total) экспортируются в Prometheus.
  • README содержит инструкцию по настройке, примеры запуска, описание политики.
  • Решение упаковано в Docker-контейнер (docker-compose для запуска + тестирования).

6. Ожидаемый результат

Артефакты

  • Файлы кода load_shedder.py (класс LoadShedder), middleware.py (middleware), main.py (FastAPI приложение).
  • Тесты test_load_shedder.py, test_integration.py.
  • Скрипт нагрузочного теста load_test.js (для k6) или locustfile.py.
  • Конфигурация docker-compose.yml, Dockerfile.
  • Документация README.md с описанием и результатами тестирования.
  • Графики/логи (опционально) PNG с результатами k6 или PDF.

Содержание ключевого файла load_shedder.py (минимум):

  • класс LoadShedder с методами try_acquire(priority) -> bool и release(priority).
  • политика: при semaphore.acquire() не удаётся – проверяет, есть ли активные запросы с более низким приоритетом, и если да – отклоняет текущий запрос (или наоборот, сбрасывает один низкоприоритетный).

7. Возможные сложности и их решение

СложностьРешение
Race condition при проверке и захвате ресурсаИспользовать asyncio.Lock внутри класса LoadShedder для атомарности решения о сбросе.
Определение приоритета на уровне HTTP-запросаЕсли тело запроса – JSON, парсинг можно сделать в middleware; для производительности – вынести приоритет в HTTP-заголовок X-Priority.
Как сбрасывать уже выполняющийся низкоприоритетный запросОбычно не сбрасываем выполняющиеся, а только отклоняем новые. Если нужно – использовать механизм asyncio.Cancel с отменой корутины, но это сложнее. Для задачи достаточно отклонять на входе.
Неравномерный приоритет внутри одного класса (например, внутри premium)Использовать взвешенную очередь (например, с весами 3:2:1 для premium:regular:batch). В рамках ТЗ достаточно простой абсолютной иерархии.
Тестирование в среде без RedisIn-memory semaphore полностью подходит; для распределённой системы потребуется Redis, но это выходит за рамки задачи.
Отсутствие реального multi-tenant контекстаЭмулировать через разные task_id в логах.

8. Бюджет времени (оценка)

ЭтапВремя
Анализ и проектирование30–45 мин
Реализация middleware1–1.5 ч
Модульное и интеграционное тестирование1 ч
Нагрузочное тестирование1–1.5 ч
Мониторинг и документация30 мин
Итого4–5 часов

Примечание: Для первого прохождения (включая изучение asyncio, k6) рекомендуется заложить 6–8 часов.


9. Связанные вопросы из базы знаний

ВопросТема
12Что такое graceful degradation и как его реализовать?
45Разработка middleware для FastAPI
78Шаблон Circuit Breaker в микросервисах
112Приоритетные очереди на основе asyncio.Queue
156Нагрузочное тестирование микросервисов с k6
233Prometheus метрики для очередей и rate limiting
301Паттерн «Throttling» и «Rate Limiting»
412Мониторинг concurrency в Python asyncio
489Стратегии Retry-After в HTTP
537Асинхронные семафоры в Python: Semaphore, BoundedSemaphore

10. Чек-лист самопроверки

  • Я реализовал middleware, который не изменяет сигнатуру существующих обработчиков.
  • Я провёл unit-тесты: проверил, что при ровно CONCURRENCY_LIMIT запросах новый batch-запрос получает 503, а premium — нет.
  • В тестах я убедился, что заголовок Retry-After содержит число (секунды) и это значение соответствует конфигу.
  • Я запустил k6 с тремя разными сценариями приоритетов и получил ожидаемое соотношение отказов (+-5%).
  • У меня есть метрики в Prometheus (или хотя бы логи), которые позволяют отслеживать активные запросы и отклонения.
  • README описывает, как изменить CONCURRENCY_LIMIT и RETRY_AFTER_SECONDS, и содержит команду для запуска нагрузочного теста.
  • Я закоммитил код, тесты, docker-compose и load-test скрипт в Git.