English translation is not available yet. Showing Russian content.
Настроить rate limiting на сообщения между агентами
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить rate limiting на сообщения между агентами
1. Цель задачи
Научиться реализовывать и тестировать механизм ограничения скорости отправки сообщений (rate limiting) в системе агентов. Необходимо настроить лимит не более 100 сообщений в секунду на одного агента и убедиться, что при превышении этого порога система не падает — сообщения либо задерживаются, либо отклоняются с корректной обработкой ошибки.
Ключевой результат Работающий rate limiter, который защищает систему от перегрузки, сохраняя стабильность при пиковых нагрузках до 500 msg/sec на агента.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Система агентов (минимум 2 агента) | Собрать пет-проект на Python asyncio или использовать готовую (например, на базе FastAPI + Uvicorn) |
| Библиотека для квот / счётчиков | redis + aioredis или pyrate-limiter (если Redis нет под рукой) |
| Инструмент для генерации нагрузки | locust, wrk, или простой Python-скрипт с asyncio.sleep |
| Логирование | logging + structlog или loguru |
| Среда выполнения | Локальная машина (Python 3.10+, Docker для Redis) |
Если нет реального инструмента — симулируем:
- Если нет Redis — используем in-memory defaultdict с time.time() и алгоритмом скользящего окна (sliding window).
- Если нет готовой системы агентов — пишем двух агентов, общающихся через очередь asyncio.Queue.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.10+ | Основная реализация |
| Асинхронный фреймворк | asyncio / FastAPI (Uvicorn) | Запуск агентов |
| Rate limiter | pyrate-limiter + in-memory или Redis | Реализация ограничения скорости |
| Контейнеризация (опционально) | Docker + Redis | Внешнее хранилище счетчиков |
| Генерация нагрузки | locust или собственный asyncio скрипт | Имитация перегрузки |
| Логирование и метрики | logging, prometheus_client (опционально) | Мониторинг превышений лимитов |
| Тестирование | pytest, pytest-asyncio | Проверка корректности работы |
4. Этапы выполнения
Этап 1: Проектирование архитектуры (20 минут)
Действия
- Определить тип rate limiter — на основе алгоритма скользящего окна (sliding window) или token bucket. Для per-agent лимита подойдёт sliding window с памятью на 100 сообщений за 1 секунду.
- Спроектировать интерфейс:
- Агент имеет метод send_message(receiver_agent_id, payload).
- Перед отправкой проверяется лимит: если текущая частота >= 100 msg/sec, сообщение либо отклоняется (возвращается ошибка), либо ставится в ожидание (backpressure).
- Выбрать стратегию: reject (бросать исключение) — проще для тестирования.
- Определить формат обмена — JSON через asyncio.Queue (локальные агенты) или HTTP (распределённые). Для задачи достаточно локальной очереди.
- Схематично нарисовать поток (текстом):
Agent A (sender) -> rate_limiter.check_and_wait() -> asyncio.Queue -> Agent B (receiver)
Ожидаемый результат этапа Документированное описание архитектуры с выбором алгоритма и стратегии обработки перегрузки.
Этап 2: Реализация rate limiter (45 минут)
Действия
-
Создать класс
SlidingWindowRateLimiter:- Хранит словарь {agent_id: deque}, где deque содержит временные метки последних до 100 сообщений.
- Метод
is_allowed(agent_id) -> bool: добавляет текущую метку, отсекает старые (>1 сек), возвращаетTrue, если длина deque ≤ 100 после добавления. - Потокобезопасность: использовать asyncio.Lock.
import time from collections import deque import asyncio class SlidingWindowRateLimiter: def __init__(self, max_per_second: int = 100): self.lock = asyncio.Lock() self.windows: dict[str, deque] = {} self.max_per_second = max_per_second async def is_allowed(self, agent_id: str) -> bool: async with self.lock: now = time.monotonic() if agent_id not in self.windows: self.windows[agent_id] = deque() dq = self.windows[agent_id] # отсекаем записи старше 1 секунды while dq and dq[0] < now - 1.0: dq.popleft() dq.append(now) return len(dq) <= self.max_per_second -
Написать unit-тесты на rate limiter:
- Отправить 100 сообщений за 0.9 сек — все разрешены.
- 101-е сообщение — запрещено.
- После паузы в 1 секунду — снова разрешено.
-
Интегрировать limiter в агента-отправителя – перед отправкой вызывать
await limiter.is_allowed(self.agent_id)и еслиFalse, бросать RateLimitExceeded или возвращать статус.
Ожидаемый результат этапа Работающий класс rate limiter с тестами, покрывающими граничные случаи.
Этап 3: Интеграция с системой агентов (30 минут)
Действия
- Создать двух агентов (AgentA и AgentB), соединённых через asyncio.Queue.
- Реализовать метод
send_message:async def send_message(self, receiver: 'Agent', payload: dict): if not await self.limiter.is_allowed(self.agent_id): raise RateLimitExceeded(f"Agent {self.agent_id} exceeded 100 msg/sec") await receiver.receive(payload) - Обработка исключения в вызывающем коде: логировать, считать метрику
rate_limit_hits. - Добавить экспозицию метрик (опционально) через Prometheus counter.
- Проверить базовый сценарий: AgentA отправляет 120 сообщений с интервалом 0.005 сек (200 msg/sec) – ровно 100 должны пройти, остальные 20 – упасть с ошибкой.
Ожидаемый результат этапа Интегрированная система, где агенты корректно ограничивают отправку.
Этап 4: Нагрузочное тестирование (40 минут)
Действия
- Написать скрипт нагрузки (или использовать locust) для AgentA:
- Отправлять сообщения с постоянной частотой 150 msg/sec в течение 5 секунд.
- Отправлять пачками: 1000 сообщений за 0.1 сек (10 000 msg/sec) – имитация перегрузки.
- Проверить поведение:
- Ожидается, что все сообщения сверх лимита будут отклонены, но агент не упадет (не произойдет блокировки event loop, не вылетит unhandled exception).
- Измерить скорость обработки – сколько сообщений действительно отправлено (через счетчик успешных доставок).
- Проверить влияние на другие агенты – во время перегрузки AgentB не должен терять уже полученные сообщения.
Ожидаемый результат этапа Отчёт с графиками (или таблицей): сгенерированная нагрузка, количество принятых/отклонённых сообщений, стабильность системы.
Этап 5: Оптимизация и альтернативная стратегия (20 минут)
Действия
- Реализовать стратегию "blocking + queue – вместо отбрасывания сообщений ставить их в очередь ожидания (backpressure) с максимальным размером буфера 1000.
async def send_message(self, receiver, payload): while not await self.limiter.is_allowed(self.agent_id): if self.buffer_size >= MAX_BUFFER: raise BufferFullError() await asyncio.sleep(0.01) # ждать освобождения ... - Сравнить две стратегии по критериям: потеря сообщений, задержка, устойчивость.
- Сделать вывод — какую стратегию лучше использовать в production для inter-agent communication.
Ожидаемый результат этапа Два режима rate limiting (reject и queue), сравнительная таблица их характеристик.
5. Критерии приемки (Definition of Done)
- Rate limiter корректно пропускает ≤100 сообщений в секунду на одного агента.
- При превышении лимита (101-е сообщение или более) агент не падает, а возвращает ошибку или ожидает.
- Нагрузочный тест с 500 msg/sec на одного агента не приводит к краху процесса или зависанию.
- Unit-тесты покрывают: нормальный режим, точный предельный случай, сброс после паузы.
- Код агента логирует каждый случай rate limiting (уровень WARNING).
- Реализованы две стратегии (reject и queue) – опционально, хотя бы одна (reject) строго обязательна.
- Нагрузочное тестирование задокументировано (количество отправленных/принятых сообщений).
- Решение работает в асинхронном окружении (asyncio).
- Документация (README) описывает, как запустить и воспроизвести тесты.
6. Ожидаемый результат
Основной артефакт
Папка проекта со следующей структурой:
rate_limiting_lab/
├── src/
│ ├── agents.py # Классы AgentA, AgentB
│ ├── rate_limiter.py # SlidingWindowRateLimiter
│ └── exceptions.py # RateLimitExceeded, BufferFullError
├── tests/
│ ├── test_rate_limiter.py # Unit-тесты
│ └── test_integration.py # Интеграционные тесты с асинхронной очередью
├── load/
│ ├── load_test.py # Скрипт нагрузки
│ └── results.txt # Результаты тестирования
├── requirements.txt
├── Dockerfile (опционально)
└── README.md
Содержание README.md
- Краткое описание задачи.
- Архитектура (опционально диаграмма текстом).
- Инструкция по запуску тестов и нагрузочного сценария.
- Объяснение выбранной стратегии и результатов сравнения.
Дополнительные результаты
- График (сгенерированный через
matplotlib) зависимости пропускной способности от частоты отправки. - Метрики (если использовали Prometheus) – можно приложить скриншоты дашборда.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Неверный расчёт скользящего окна (пропускает слишком много или мало) | В unit-тестах использовать фиктивное время (time.monotonic подменить через unittest.mock.patch), проверить граничные случаи |
| Агент «зависает» при блокирующей очереди (backpressure) | Установить таймаут на ожидание и максимальный размер буфера; добавить asyncio.wait_for |
| Асинхронная гонка при доступе к словарю | Использовать asyncio.Lock (как в примере) |
| Тестирование точного времени (100 msg за 1 сек) сложно воспроизвести | В тестах заменить time.monotonic на управляемый MockTime |
| При очень высокой нагрузке (10k msg/sec) деградация производительности Python | Рассмотреть реализацию на Cython или вынос счётчиков в Redis (однако для задачи достаточно in-memory) |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Проектирование | 20 мин |
| Этап 2: Реализация rate limiter + юнит-тесты | 45 мин |
| Этап 3: Интеграция с агентами | 30 мин |
| Этап 4: Нагрузочное тестирование | 40 мин |
| Этап 5: Оптимизация (опционально) | 20 мин |
| Итого (без опционального этапа) | ~2.5 часа |
| Итого (полный) | ~3 часа |
Примечание Для первого выполнения задачи рекомендуется ограничиться стратегией reject и пропустить этап 5 — в этом случае бюджет составит 2 часа 15 минут.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Алгоритмы rate limiting (token bucket, sliding window) |
| 47 | Очереди сообщений и backpressure |
| 113 | Потокобезопасность в асинхронном Python |
| 218 | Нагрузочное тестирование asyncio-сервисов |
| 302 | Использование Redis как распределённого счётчика |
| 456 | Обработка исключений в распределённых системах |
| 561 | Мониторинг и логирование отказов (rate limit exceeded) |
| 678 | Стратегии отклонения vs буферизации сообщений |
| 789 | Многопоточная обработка агентов vs asyncio |
| 890 | Тестирование асинхронного кода (pytest-asyncio, mock) |
10. Чек-лист самопроверки
- Я реализовал sliding window rate limiter с защитой от гонок (asyncio.Lock).
- Написал unit-тесты, которые проверяют точное соблюдение лимита (100 msg/sec).
- Нагрузочный тест показал, что система не падает при 500 msg/sec – максимум теряются лишние сообщения.
- Я задокументировал выбранную стратегию (reject) и (опционально) сравнил с queue.
- README содержит инструкцию по запуску, а результаты тестов сохранены в текстовый файл.