English translation is not available yet. Showing Russian content.

Настроить rate limiting на сообщения между агентами

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить rate limiting на сообщения между агентами

1. Цель задачи

Научиться реализовывать и тестировать механизм ограничения скорости отправки сообщений (rate limiting) в системе агентов. Необходимо настроить лимит не более 100 сообщений в секунду на одного агента и убедиться, что при превышении этого порога система не падает — сообщения либо задерживаются, либо отклоняются с корректной обработкой ошибки.

Ключевой результат Работающий rate limiter, который защищает систему от перегрузки, сохраняя стабильность при пиковых нагрузках до 500 msg/sec на агента.


2. Исходные данные

Что нужноОткуда взять
Система агентов (минимум 2 агента)Собрать пет-проект на Python asyncio или использовать готовую (например, на базе FastAPI + Uvicorn)
Библиотека для квот / счётчиковredis + aioredis или pyrate-limiter (если Redis нет под рукой)
Инструмент для генерации нагрузкиlocust, wrk, или простой Python-скрипт с asyncio.sleep
Логированиеlogging + structlog или loguru
Среда выполненияЛокальная машина (Python 3.10+, Docker для Redis)

Если нет реального инструмента — симулируем:

  • Если нет Redis — используем in-memory defaultdict с time.time() и алгоритмом скользящего окна (sliding window).
  • Если нет готовой системы агентов — пишем двух агентов, общающихся через очередь asyncio.Queue.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Основная реализация
Асинхронный фреймворкasyncio / FastAPI (Uvicorn)Запуск агентов
Rate limiterpyrate-limiter + in-memory или RedisРеализация ограничения скорости
Контейнеризация (опционально)Docker + RedisВнешнее хранилище счетчиков
Генерация нагрузкиlocust или собственный asyncio скриптИмитация перегрузки
Логирование и метрикиlogging, prometheus_client (опционально)Мониторинг превышений лимитов
Тестированиеpytest, pytest-asyncioПроверка корректности работы

4. Этапы выполнения

Этап 1: Проектирование архитектуры (20 минут)

Действия

  1. Определить тип rate limiter — на основе алгоритма скользящего окна (sliding window) или token bucket. Для per-agent лимита подойдёт sliding window с памятью на 100 сообщений за 1 секунду.
  2. Спроектировать интерфейс:
    • Агент имеет метод send_message(receiver_agent_id, payload).
    • Перед отправкой проверяется лимит: если текущая частота >= 100 msg/sec, сообщение либо отклоняется (возвращается ошибка), либо ставится в ожидание (backpressure).
    • Выбрать стратегию: reject (бросать исключение) — проще для тестирования.
  3. Определить формат обмена — JSON через asyncio.Queue (локальные агенты) или HTTP (распределённые). Для задачи достаточно локальной очереди.
  4. Схематично нарисовать поток (текстом):
    Agent A (sender) -> rate_limiter.check_and_wait() -> asyncio.Queue -> Agent B (receiver)
    

Ожидаемый результат этапа Документированное описание архитектуры с выбором алгоритма и стратегии обработки перегрузки.


Этап 2: Реализация rate limiter (45 минут)

Действия

  1. Создать класс SlidingWindowRateLimiter:

    • Хранит словарь {agent_id: deque}, где deque содержит временные метки последних до 100 сообщений.
    • Метод is_allowed(agent_id) -> bool: добавляет текущую метку, отсекает старые (>1 сек), возвращает True, если длина deque ≤ 100 после добавления.
    • Потокобезопасность: использовать asyncio.Lock.
    import time
    from collections import deque
    import asyncio
    
    class SlidingWindowRateLimiter:
        def __init__(self, max_per_second: int = 100):
            self.lock = asyncio.Lock()
            self.windows: dict[str, deque] = {}
            self.max_per_second = max_per_second
    
        async def is_allowed(self, agent_id: str) -> bool:
            async with self.lock:
                now = time.monotonic()
                if agent_id not in self.windows:
                    self.windows[agent_id] = deque()
                dq = self.windows[agent_id]
                # отсекаем записи старше 1 секунды
                while dq and dq[0] < now - 1.0:
                    dq.popleft()
                dq.append(now)
                return len(dq) <= self.max_per_second
    
  2. Написать unit-тесты на rate limiter:

    • Отправить 100 сообщений за 0.9 сек — все разрешены.
    • 101-е сообщение — запрещено.
    • После паузы в 1 секунду — снова разрешено.
  3. Интегрировать limiter в агента-отправителя – перед отправкой вызывать await limiter.is_allowed(self.agent_id) и если False, бросать RateLimitExceeded или возвращать статус.

Ожидаемый результат этапа Работающий класс rate limiter с тестами, покрывающими граничные случаи.


Этап 3: Интеграция с системой агентов (30 минут)

Действия

  1. Создать двух агентов (AgentA и AgentB), соединённых через asyncio.Queue.
  2. Реализовать метод send_message:
    async def send_message(self, receiver: 'Agent', payload: dict):
        if not await self.limiter.is_allowed(self.agent_id):
            raise RateLimitExceeded(f"Agent {self.agent_id} exceeded 100 msg/sec")
        await receiver.receive(payload)
    
  3. Обработка исключения в вызывающем коде: логировать, считать метрику rate_limit_hits.
  4. Добавить экспозицию метрик (опционально) через Prometheus counter.
  5. Проверить базовый сценарий: AgentA отправляет 120 сообщений с интервалом 0.005 сек (200 msg/sec) – ровно 100 должны пройти, остальные 20 – упасть с ошибкой.

Ожидаемый результат этапа Интегрированная система, где агенты корректно ограничивают отправку.


Этап 4: Нагрузочное тестирование (40 минут)

Действия

  1. Написать скрипт нагрузки (или использовать locust) для AgentA:
    • Отправлять сообщения с постоянной частотой 150 msg/sec в течение 5 секунд.
    • Отправлять пачками: 1000 сообщений за 0.1 сек (10 000 msg/sec) – имитация перегрузки.
  2. Проверить поведение:
    • Ожидается, что все сообщения сверх лимита будут отклонены, но агент не упадет (не произойдет блокировки event loop, не вылетит unhandled exception).
  3. Измерить скорость обработки – сколько сообщений действительно отправлено (через счетчик успешных доставок).
  4. Проверить влияние на другие агенты – во время перегрузки AgentB не должен терять уже полученные сообщения.

Ожидаемый результат этапа Отчёт с графиками (или таблицей): сгенерированная нагрузка, количество принятых/отклонённых сообщений, стабильность системы.


Этап 5: Оптимизация и альтернативная стратегия (20 минут)

Действия

  1. Реализовать стратегию "blocking + queue – вместо отбрасывания сообщений ставить их в очередь ожидания (backpressure) с максимальным размером буфера 1000.
    async def send_message(self, receiver, payload):
        while not await self.limiter.is_allowed(self.agent_id):
            if self.buffer_size >= MAX_BUFFER:
                raise BufferFullError()
            await asyncio.sleep(0.01)  # ждать освобождения
        ...
    
  2. Сравнить две стратегии по критериям: потеря сообщений, задержка, устойчивость.
  3. Сделать вывод — какую стратегию лучше использовать в production для inter-agent communication.

Ожидаемый результат этапа Два режима rate limiting (reject и queue), сравнительная таблица их характеристик.


5. Критерии приемки (Definition of Done)

  • Rate limiter корректно пропускает ≤100 сообщений в секунду на одного агента.
  • При превышении лимита (101-е сообщение или более) агент не падает, а возвращает ошибку или ожидает.
  • Нагрузочный тест с 500 msg/sec на одного агента не приводит к краху процесса или зависанию.
  • Unit-тесты покрывают: нормальный режим, точный предельный случай, сброс после паузы.
  • Код агента логирует каждый случай rate limiting (уровень WARNING).
  • Реализованы две стратегии (reject и queue) – опционально, хотя бы одна (reject) строго обязательна.
  • Нагрузочное тестирование задокументировано (количество отправленных/принятых сообщений).
  • Решение работает в асинхронном окружении (asyncio).
  • Документация (README) описывает, как запустить и воспроизвести тесты.

6. Ожидаемый результат

Основной артефакт
Папка проекта со следующей структурой:

rate_limiting_lab/
├── src/
│   ├── agents.py            # Классы AgentA, AgentB
│   ├── rate_limiter.py      # SlidingWindowRateLimiter
│   └── exceptions.py        # RateLimitExceeded, BufferFullError
├── tests/
│   ├── test_rate_limiter.py # Unit-тесты
│   └── test_integration.py  # Интеграционные тесты с асинхронной очередью
├── load/
│   ├── load_test.py         # Скрипт нагрузки
│   └── results.txt          # Результаты тестирования
├── requirements.txt
├── Dockerfile (опционально)
└── README.md

Содержание README.md

  • Краткое описание задачи.
  • Архитектура (опционально диаграмма текстом).
  • Инструкция по запуску тестов и нагрузочного сценария.
  • Объяснение выбранной стратегии и результатов сравнения.

Дополнительные результаты

  • График (сгенерированный через matplotlib) зависимости пропускной способности от частоты отправки.
  • Метрики (если использовали Prometheus) – можно приложить скриншоты дашборда.

7. Возможные сложности и их решение

СложностьРешение
Неверный расчёт скользящего окна (пропускает слишком много или мало)В unit-тестах использовать фиктивное время (time.monotonic подменить через unittest.mock.patch), проверить граничные случаи
Агент «зависает» при блокирующей очереди (backpressure)Установить таймаут на ожидание и максимальный размер буфера; добавить asyncio.wait_for
Асинхронная гонка при доступе к словарюИспользовать asyncio.Lock (как в примере)
Тестирование точного времени (100 msg за 1 сек) сложно воспроизвестиВ тестах заменить time.monotonic на управляемый MockTime
При очень высокой нагрузке (10k msg/sec) деградация производительности PythonРассмотреть реализацию на Cython или вынос счётчиков в Redis (однако для задачи достаточно in-memory)

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Проектирование20 мин
Этап 2: Реализация rate limiter + юнит-тесты45 мин
Этап 3: Интеграция с агентами30 мин
Этап 4: Нагрузочное тестирование40 мин
Этап 5: Оптимизация (опционально)20 мин
Итого (без опционального этапа)~2.5 часа
Итого (полный)~3 часа

Примечание Для первого выполнения задачи рекомендуется ограничиться стратегией reject и пропустить этап 5 — в этом случае бюджет составит 2 часа 15 минут.


9. Связанные вопросы из базы знаний

ВопросТема
12Алгоритмы rate limiting (token bucket, sliding window)
47Очереди сообщений и backpressure
113Потокобезопасность в асинхронном Python
218Нагрузочное тестирование asyncio-сервисов
302Использование Redis как распределённого счётчика
456Обработка исключений в распределённых системах
561Мониторинг и логирование отказов (rate limit exceeded)
678Стратегии отклонения vs буферизации сообщений
789Многопоточная обработка агентов vs asyncio
890Тестирование асинхронного кода (pytest-asyncio, mock)

10. Чек-лист самопроверки

  • Я реализовал sliding window rate limiter с защитой от гонок (asyncio.Lock).
  • Написал unit-тесты, которые проверяют точное соблюдение лимита (100 msg/sec).
  • Нагрузочный тест показал, что система не падает при 500 msg/sec – максимум теряются лишние сообщения.
  • Я задокументировал выбранную стратегию (reject) и (опционально) сравнил с queue.
  • README содержит инструкцию по запуску, а результаты тестов сохранены в текстовый файл.