Реализовать distributed task queue для агентов

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать distributed task queue для агентов

1. Цель задачи

Спроектировать и развернуть распределённую очередь задач для multi-agent системы с использованием Celery и Redis. Реализовать поддержку приоритетов задач и обеспечить надёжную обработку 100 конкурентных заданий. Ключевой результат 100 задач с различными приоритетами успешно распределяются между воркерами и завершаются с корректным учётом порядка выполнения.

2. Исходные данные

Что нужноОткуда взять
Redis (локальный или Docker-образ)Официальный образ redis:7-alpine или пакет redis-server
Celery (Python)Установить через pip: celery[redis]
Тестовые JSON-задания (100 шт.)Сгенерировать скриптом (см. Этап 4)
Виртуальное окружение Pythonpython -m venv venv
Docker (опционально)Docker Desktop / docker.io

Если нет реального инструмента — симулируем:

  1. Установить Redis через apt install redis-server (Linux) или скачать предварительно скомпилированный бинарник для Windows.
  2. Если Docker недоступен, использовать sudo systemctl start redis (Linux) или запустить redis-server вручную.
  3. Если Celery не ставится — использовать pip install celery[Вики/Kafka|redis в изолированном Вики/Conda venv|venv.

3. Технологический стек

КомпонентИнструментыНазначение
Брокер сообщенийRedis 7+Хранение очередей, обмен сообщениями, хранение результатов
Фреймворк задачCelery 5+Координация воркеров, управление очередями, приоритезация
ВоркерыCelery worker (Python)Исполнение задач, эмуляция агентов
МониторингCelery FlowerВизуализация состояния очередей и воркеров
Среда выполненияPython 3.10+, Docker (опц.)Контейнеризация и воспроизводимость

4. Этапы выполнения

Этап 1: Настройка окружения и запуск Redis (30 мин)

Действия

  1. Создать и активировать виртуальное окружение:
    python -m venv venv
    source venv/bin/activate  # или venv\Scripts\activate для Windows
    
  2. Установить зависимости:
    pip install celery[redis] flower
    
  3. Запустить Redis локально:
  4. Проверить доступность Redis:
    redis-cli ping
    # Ответ: PONG
    

Ожидаемый результат этапа Redis работает и принимает соединения.

Этап 2: Создание приложения Celery и конфигурация приоритетов (1 ч)

Действия

  1. Создать файл tasks.py с настройками Celery:
    from celery import Celery
    
    app = Celery('agents', broker='redis://localhost:6379/0',
                 backend='redis://localhost:6379/0')
    
    # Настройка очередей с приоритетами
    app.conf.task_queues = {
        'high': {'exchange': 'high', 'routing_key': 'high', 'queue_arguments': {'x-max-priority': 10}},
        'medium': {'exchange': 'medium', 'routing_key': 'medium', 'queue_arguments': {'x-max-priority': 5}},
        'low': {'exchange': 'low', 'routing_key': 'low', 'queue_arguments': {'x-max-priority': 1}},
    }
    
    app.conf.task_default_queue = 'medium'
    app.conf.task_default_priority = 5
    app.conf.worker_prefetch_multiplier = 1  # честное распределение
    
  2. Определить задачу-заглушку для агента:
    @app.task(bind=True, name='agent_task')
    def agent_task(self, agent_id, payload):
        import time, random
        sleep_time = random.uniform(0.5, 2.0)  # имитация работы агента
        time.sleep(sleep_time)
        return {'agent': agent_id, 'status': 'done', 'payload': payload}
    
  3. Запустить Celery worker:
    celery -A tasks worker --loglevel=info --concurrency=4 -Q high,medium,low
    

Ожидаемый результат этапа Воркер запущен и слушает очереди high, medium, low. Задачи в tasks.py видны.

Этап 3: Генерация 100 тестовых задач с разными приоритетами (30 мин)

Действия

  1. Написать скрипт generate_tasks.py:
    from tasks import agent_task
    import random
    
    priorities = {'high': 9, 'medium': 5, 'low': 1}
    queues = {'high': 'high', 'medium': 'medium', 'low': 'low'}
    
    for i in range(100):
        level = random.choice(['high', 'medium', 'low'])
        prio = priorities[level]
        queue = queues[level]
        agent_task.apply_async(
            args=(i, f'data_{i}'),
            queue=queue,
            priority=prio
        )
        print(f'Sent task {i} to {queue} with priority {prio}')
    
  2. Запустить скрипт:
    python generate_tasks.py
    
  3. Наблюдать в консоли воркера получение задач.

Ожидаемый результат этапа 100 задач отправлены в соответствующие очереди. Воркеры начали их обрабатывать.

Этап 4: Мониторинг и верификация приоритетов (1 ч)

Действия

  1. Запустить Flower для визуального мониторинга:
    celery -A tasks flower --port=5555
    
  2. Открыть http://localhost:5555 — проверить, что воркеры живы, очереди содержат задачи.
  3. Написать скрипт check_results.py, который дожидается завершения всех задач и проверяет порядок выполнения:
    from tasks import app
    from celery.result import AsyncResult
    
    # Получить все ID задач из брокера (упрощённо)
    # В реальности нужно сохранять ID при отправке
    # Предположим, что мы сохранили их в файл
    results = []
    with open('task_ids.txt') as f:
        for line in f:
            tid = line.strip()
            res = AsyncResult(tid, app=app)
            results.append((tid, res.get(timeout=5)))
    
    # Проверить, что ни одна задача не упала
    assert all(r[1]['status'] == 'done' for r in results)
    print(f'All {len(results)} tasks completed successfully')
    
  4. Убедиться, что задачи из очереди high завершились раньше, чем low при прочих равных.

Ожидаемый результат этапа Flower отображает все 100 задач, их статусы. Скрипт верификации показывает 100 успешных выполнений.

Этап 5: Масштабирование и стресс-тест (30 мин)

Действия

  1. Увеличить количество воркеров до 8:
    celery -A tasks worker --loglevel=info --concurrency=8 -Q high,medium,low
    
  2. Повторно запустить генерацию 100 задач с ещё большим разбросом приоритетов.
  3. Замерять время выполнения всех задач с помощью Flower или логов.
  4. Выгрузить статистику из Flower (вкладка "Metrics").

Ожидаемый результат этапа Время обработки 100 задач сократилось пропорционально количеству воркеров. Приоритетные задачи выполняются в первую очередь.

5. Критерии приемки (Definition of Done)

  • Установлен и работает Redis; Celery worker успешно стартует и подключается к Redis.
  • Созданы три очереди (high, medium, low) с различными максимальными приоритетами.
  • 100 задач отправлены с корректными приоритетами в соответствии с очередью.
  • Все 100 задач завершены без ошибок (статус done).
  • Воркеры обработали задачи из очереди high раньше, чем из low (порядок соблюдён).
  • Flower отображает состояние всех воркеров и очередей, метрики доступны.
  • При увеличении числа воркеров общее время обработки уменьшается (масштабирование).
  • Код задачи agent_task корректно принимает и возвращает данные.

6. Ожидаемый результат

  • Основной артефакт каталог с файлами:
    • tasks.py — конфигурация Celery и определение задачи.
    • generate_tasks.py — скрипт отправки 100 задач.
    • check_results.py — скрипт верификации.
    • requirements.txt — список зависимостей.
    • README.md — инструкция по запуску.
  • Содержимое описанная выше инфраструктура, способная обработать 100 задач с приоритетами.
  • Дополнительные результаты скриншоты Flower, логи выполнения, файл с замером времени.

7. Возможные сложности и их решение

СложностьРешение
Redis недоступен после запускаПроверить порт 6379, отключить firewall, использовать redis-cli ping
Celery worker не видит задачУбедиться, что воркер подписан на правильные очереди (-Q high,medium,low)
Приоритеты не соблюдаютсяВключить worker_prefetch_multiplier=1; убедиться, что очередь объявлена с x-max-priority
Проблемы с правами на запись логовЗапускать воркер из каталога с правами на запись
Flower не запускаетсяУстановить отдельно pip install flower; проверить версии Celery и Flower (совместимость)

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Настройка окружения и Redis30 мин
Этап 2: Создание приложения Celery1 ч
Этап 3: Генерация 100 задач30 мин
Этап 4: Мониторинг и верификация1 ч
Этап 5: Масштабирование и стресс-тест30 мин
Итого3,5 ч

Примечание: для первого раза заложите 5–6 часов с учётом отладки.

9. Связанные вопросы из базы знаний

ВопросТема
45Архитектура очередей сообщений
78Настройка приоритетов в Celery
101Мониторинг задач через Flower
132Масштабирование воркеров Celery
204Обработка ошибок в Celery задачах
277Интеграция Redis как брокера
315Стратегии повторной отправки задач
450Конкурентное выполнение многоагентных систем
522Профилирование производительности очередей
688Безопасность распределённых задач

10. Чек-лист самопроверки

  • Я корректно настроил Redis и проверил его работу через redis-cli.
  • Я создал файл tasks.py с конфигурацией очередей и задачей-заглушкой.
  • Я отправил ровно 100 задач, записав их ID в файл для верификации.
  • Я запустил Flower и увидел, что все 100 задач завершены.
  • Я убедился, что задачи из очереди high завершились раньше, чем из low, при одинаковой загрузке.