Настроить load shedding при перегрузке в multi-tenant агентной системе
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить load shedding при перегрузке в multi-tenant агентной системе
1. Цель задачи
Реализовать механизм load shedding (отбрасывание части запросов при перегрузке) для агентной системы, обрабатывающей запросы от разных классов пользователей (premium, regular, batch-pipeline). При превышении порога загрузки система должна приоритизировать запросы premium-клиентов, частично отклонять regular-запросы и полностью сбрасывать batch-поток, возвращая HTTP-код 503 Service Unavailable с заголовком Retry-After. Механизм должен быть прозрачен для клиентов, тестируем и интегрируем в существующую архитектуру.
Ключевой результат Работающая система load shedding, которая во время стресс-теста гарантирует, что премиум-запросы получают ответ (не более 1% отказов), regular-запросы — не более 30% отказов (при равномерном смешивании), а batch-поток полностью блокируется при превышении порога.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Архитектура текущей агентной системы | Описание из проекта (или гипотетическая схема с FastAPI + asyncio) |
| Требования к SLA для каждого класса запросов | Задание (premium: <100ms p99; regular: <500ms p99; batch: no SLA) |
| Данные о типичной нагрузке (RPS, latency) | Логи или прометеус метрики (если нет — симулируем) |
| Инструмент для нагрузочного тестирования | k6, Locust или wrk2 |
| Репозиторий с кодом агентной системы | Git-репозиторий (возможно, пустой для pet-проекта) |
| Модель очередей (in-memory или Redis) | Решение команды (в задаче используем in-memory asyncio.Queue) |
Если нет реального инструмента — симулируем:
- Создаём standalone-сервис на FastAPI с эндпоинтом
/agent(принимает запросы с телом {"priority": "premium"|"regular"|"batch", "task_id": "..."}). - Встраиваем middleware
LoadSheddingMiddleware, который считывает приоритет из JSON-тела. - Используем in-memory семафор/счётчик для отслеживания текущей загрузки (количество активных обработок).
- Порог загрузки (max_concurrent = 10) и Retry-After в секундах задаём через переменные окружения.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык и Web-фреймворк | Python 3.12, FastAPI, uvicorn | Реализация сервиса |
| Асинхронные очереди | asyncio.Queue | Приоритетная очередь (опционально) |
| Мониторинг (опционально) | Prometheus client (prometheus_client) | Метрики load shedding (количество отклонённых, загрузка) |
| Нагрузочное тестирование | k6 / Locust | Валидация поведения при перегрузке |
| Логирование | structlog / logging | Аудит решений о сбросе |
| Контейнеризация | Docker (docker-compose) | Изолированное развёртывание |
| CI | GitHub Actions (опционально) | Автоматизация нагрузочного теста |
4. Этапы выполнения
Этап 1: Анализ требований и проектирование (30-45 минут)
Действия
- Определить классы запросов и их приоритеты (premium > regular > batch).
- Сформулировать политику load shedding:
- Если число активных запросов < max_concurrent → все запросы принимаются.
- Если ≥ max_concurrent → запрос с наименьшим приоритетом отклоняется (возвращается 503).
- Если активные запросы превышают max_concurrent + buffer → даже premium может быть отклонён (но buffer выставляем так, чтобы этого не произошло при разумной нагрузке).
- Определить метрики для контроля: active_requests, rejected_total, rejected_by_priority.
- Выбрать место вставки – в middleware (лучше всего, до handler’а) или в транспортном слое.
Ожидаемый результат этапа Документ с описанием политики, таблица приоритетов, черновая схема кода.
Этап 2: Реализация middleware и конфигурации (1-1.5 часа)
Действия
- Создать класс
LoadShedder- использует
asyncio.Semaphoreс начальным значениемCONCURRENCY_LIMIT. - метод
try_acquire(priority)— пытается уменьшить семафор; если семафор занят, проверяет приоритет текущего запроса и сравнивает с приоритетами выполняющихся (через очередь ожидания). - Для простоты начнём с абсолютной политики: при достижении лимита все новые запросы отклоняются, но premium проходят с дополнительным buffer'ом.
- Реализация через
asyncio.Queueприоритетов для обработки (необязательно, можно и просто счётчик).
- использует
- Реализовать middleware FastAPI
- Извлекает приоритет из тела запроса (или заголовка).
- Проверяет, может ли запрос быть принят.
- Если да, устанавливает контекст и передаёт обработчику.
- Если нет, возвращает JSON-ответ с
status_code=503и заголовкомRetry-After: <секунды>.
- Добавить переменные окружения
CONCURRENCY_LIMIT(int) — максимальное число одновременно обрабатываемых запросов.RETRY_AFTER_SECONDS(int) — значение заголовка Retry-After.PRIORITY_WEIGHTS(str) — для возможной взвешенной политики (но на первом этапе используем простую).
- Подготовить обработчик
/agent(заглушка):- Имитация работы:
await asyncio.sleep(random.uniform(0.1, 0.5)).
- Имитация работы:
Ожидаемый результат этапа Рабочий сервис, запускаемый локально, принимающий запросы с приоритетом и применяющий load shedding.
Этап 3: Модульное и интеграционное тестирование (1 час)
Действия
- Написать unit-тесты для LoadShedder
- Проверить, что при загрузке ниже лимита все запросы проходят.
- Что при превышении лимита batch-запросы отклоняются, premium проходят.
- Что regular-запросы обрабатываются в соответствии с политикой (например, 50% отклоняются при лимите 10 и 15 запросах).
- Что заголовок Retry-After присутствует.
- Написать интеграционный тест с помощью
httpx.AsyncClient:- Отправка 50 параллельных запросов (asyncio.gather) с разными приоритетами.
- Проверка, что количество premium-ответов с 503 ≤ 5% (например, 0), regular ≤ 30%, batch ≥ 80%.
- Проверить корректность возврата 503 – использовать
response.status_code == 503и'Retry-After' in response.headers.
Ожидаемый результат этапа Набор тестов (pytest), проходящих на CI.
Этап 4: Нагрузочное тестирование с k6/Locust (1-1.5 часа)
Действия
- Установить и настроить k6 (или Locust).
- Написать скрипт нагрузочного теста
- Генерация запросов с разным приоритетом (например, 20% premium, 60% regular, 20% batch).
- Постепенное увеличение RPS от 10 до 100 (если лимит concurrency=10).
- Замерять latency и процент ошибок по классам.
- Запустить тест и собрать данные
- Вывести метрики:
http_req_durationдля успешных,http_req_failedдля ошибок. - Разделить по приоритету, используя таги (tags) в k6.
- Вывести метрики:
- Зафиксировать результаты
- Таблица: при каком RPS началось отбрасывание, сколько запросов отправлено, сколько получили 503.
Ожидаемый результат этапа Отчёт о нагрузочном тестировании (графики, цифры), подтверждающий выполнение ключевого результата.
Этап 5: Мониторинг и документация (30 минут)
Действия
- Добавить Prometheus-метрики:
- Экспонировать метрики через
/metrics(FastAPI + prometheus_client). - Написать README
- Описание политики load shedding.
- Как конфигурировать (переменные окружения).
- Пример запуска и нагрузочного теста.
- Если есть Grafana – добавить скриншот дашборда (опционально).
Ожидаемый результат этапа Документированное решение, код в Git, README.
5. Критерии приемки (Definition of Done)
- Load shedding встроен как middleware FastAPI без изменения существующих обработчиков.
- При превышении лимита (CONCURRENCY_LIMIT) запросы batch-потока получают HTTP 503 с заголовком Retry-After.
- Premium-запросы имеют приоритет: при перегрузке они обрабатываются, а regular/batch отклоняются (для упрощения premium всегда обрабатывается, пока лимит не превышен более чем на buffer).
- Код покрыт unit-тестами (минимум 5 тест-кейсов).
- Проведён нагрузочный тест (k6/Locust), результаты задокументированы.
- Метрики (active_requests, rejected_total) экспортируются в Prometheus.
- README содержит инструкцию по настройке, примеры запуска, описание политики.
- Решение упаковано в Docker-контейнер (docker-compose для запуска + тестирования).
6. Ожидаемый результат
Артефакты
- Файлы кода
load_shedder.py(класс LoadShedder),middleware.py(middleware),main.py(FastAPI приложение). - Тесты
test_load_shedder.py,test_integration.py. - Скрипт нагрузочного теста
load_test.js(для k6) илиlocustfile.py. - Конфигурация
docker-compose.yml,Dockerfile. - Документация
README.mdс описанием и результатами тестирования. - Графики/логи (опционально) PNG с результатами k6 или PDF.
Содержание ключевого файла load_shedder.py (минимум):
- класс
LoadShedderс методамиtry_acquire(priority) -> boolиrelease(priority). - политика: при
semaphore.acquire()не удаётся – проверяет, есть ли активные запросы с более низким приоритетом, и если да – отклоняет текущий запрос (или наоборот, сбрасывает один низкоприоритетный).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Race condition при проверке и захвате ресурса | Использовать asyncio.Lock внутри класса LoadShedder для атомарности решения о сбросе. |
| Определение приоритета на уровне HTTP-запроса | Если тело запроса – JSON, парсинг можно сделать в middleware; для производительности – вынести приоритет в HTTP-заголовок X-Priority. |
| Как сбрасывать уже выполняющийся низкоприоритетный запрос | Обычно не сбрасываем выполняющиеся, а только отклоняем новые. Если нужно – использовать механизм asyncio.Cancel с отменой корутины, но это сложнее. Для задачи достаточно отклонять на входе. |
| Неравномерный приоритет внутри одного класса (например, внутри premium) | Использовать взвешенную очередь (например, с весами 3:2:1 для premium:regular:batch). В рамках ТЗ достаточно простой абсолютной иерархии. |
| Тестирование в среде без Redis | In-memory semaphore полностью подходит; для распределённой системы потребуется Redis, но это выходит за рамки задачи. |
| Отсутствие реального multi-tenant контекста | Эмулировать через разные task_id в логах. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Анализ и проектирование | 30–45 мин |
| Реализация middleware | 1–1.5 ч |
| Модульное и интеграционное тестирование | 1 ч |
| Нагрузочное тестирование | 1–1.5 ч |
| Мониторинг и документация | 30 мин |
| Итого | 4–5 часов |
Примечание: Для первого прохождения (включая изучение asyncio, k6) рекомендуется заложить 6–8 часов.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Что такое graceful degradation и как его реализовать? |
| 45 | Разработка middleware для FastAPI |
| 78 | Шаблон Circuit Breaker в микросервисах |
| 112 | Приоритетные очереди на основе asyncio.Queue |
| 156 | Нагрузочное тестирование микросервисов с k6 |
| 233 | Prometheus метрики для очередей и rate limiting |
| 301 | Паттерн «Throttling» и «Rate Limiting» |
| 412 | Мониторинг concurrency в Python asyncio |
| 489 | Стратегии Retry-After в HTTP |
| 537 | Асинхронные семафоры в Python: Semaphore, BoundedSemaphore |
10. Чек-лист самопроверки
- Я реализовал middleware, который не изменяет сигнатуру существующих обработчиков.
- Я провёл unit-тесты: проверил, что при ровно
CONCURRENCY_LIMITзапросах новый batch-запрос получает 503, а premium — нет. - В тестах я убедился, что заголовок
Retry-Afterсодержит число (секунды) и это значение соответствует конфигу. - Я запустил k6 с тремя разными сценариями приоритетов и получил ожидаемое соотношение отказов (+-5%).
- У меня есть метрики в Prometheus (или хотя бы логи), которые позволяют отслеживать активные запросы и отклонения.
- README описывает, как изменить
CONCURRENCY_LIMITиRETRY_AFTER_SECONDS, и содержит команду для запуска нагрузочного теста. - Я закоммитил код, тесты, docker-compose и load-test скрипт в Git.