Реализовать distributed task queue для агентов
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать distributed task queue для агентов
1. Цель задачи
Спроектировать и развернуть распределённую очередь задач для multi-agent системы с использованием Celery и Redis. Реализовать поддержку приоритетов задач и обеспечить надёжную обработку 100 конкурентных заданий. Ключевой результат 100 задач с различными приоритетами успешно распределяются между воркерами и завершаются с корректным учётом порядка выполнения.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Redis (локальный или Docker-образ) | Официальный образ redis:7-alpine или пакет redis-server |
| Celery (Python) | Установить через pip: celery[redis] |
| Тестовые JSON-задания (100 шт.) | Сгенерировать скриптом (см. Этап 4) |
| Виртуальное окружение Python | python -m venv venv |
| Docker (опционально) | Docker Desktop / docker.io |
Если нет реального инструмента — симулируем:
- Установить Redis через
apt install redis-server(Linux) или скачать предварительно скомпилированный бинарник для Windows. - Если Docker недоступен, использовать sudo systemctl start redis (Linux) или запустить
redis-serverвручную. - Если Celery не ставится — использовать
pip install celery[Вики/Kafka|redisв изолированном Вики/Conda venv|venv.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Брокер сообщений | Redis 7+ | Хранение очередей, обмен сообщениями, хранение результатов |
| Фреймворк задач | Celery 5+ | Координация воркеров, управление очередями, приоритезация |
| Воркеры | Celery worker (Python) | Исполнение задач, эмуляция агентов |
| Мониторинг | Celery Flower | Визуализация состояния очередей и воркеров |
| Среда выполнения | Python 3.10+, Docker (опц.) | Контейнеризация и воспроизводимость |
4. Этапы выполнения
Этап 1: Настройка окружения и запуск Redis (30 мин)
Действия
- Создать и активировать виртуальное окружение:
python -m venv venv source venv/bin/activate # или venv\Scripts\activate для Windows - Установить зависимости:
pip install celery[redis] flower - Запустить Redis локально:
- Проверить доступность Redis:
redis-cli ping # Ответ: PONG
Ожидаемый результат этапа Redis работает и принимает соединения.
Этап 2: Создание приложения Celery и конфигурация приоритетов (1 ч)
Действия
- Создать файл
tasks.pyс настройками Celery:from celery import Celery app = Celery('agents', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') # Настройка очередей с приоритетами app.conf.task_queues = { 'high': {'exchange': 'high', 'routing_key': 'high', 'queue_arguments': {'x-max-priority': 10}}, 'medium': {'exchange': 'medium', 'routing_key': 'medium', 'queue_arguments': {'x-max-priority': 5}}, 'low': {'exchange': 'low', 'routing_key': 'low', 'queue_arguments': {'x-max-priority': 1}}, } app.conf.task_default_queue = 'medium' app.conf.task_default_priority = 5 app.conf.worker_prefetch_multiplier = 1 # честное распределение - Определить задачу-заглушку для агента:
@app.task(bind=True, name='agent_task') def agent_task(self, agent_id, payload): import time, random sleep_time = random.uniform(0.5, 2.0) # имитация работы агента time.sleep(sleep_time) return {'agent': agent_id, 'status': 'done', 'payload': payload} - Запустить Celery worker:
celery -A tasks worker --loglevel=info --concurrency=4 -Q high,medium,low
Ожидаемый результат этапа Воркер запущен и слушает очереди high, medium, low. Задачи в tasks.py видны.
Этап 3: Генерация 100 тестовых задач с разными приоритетами (30 мин)
Действия
- Написать скрипт
generate_tasks.py:from tasks import agent_task import random priorities = {'high': 9, 'medium': 5, 'low': 1} queues = {'high': 'high', 'medium': 'medium', 'low': 'low'} for i in range(100): level = random.choice(['high', 'medium', 'low']) prio = priorities[level] queue = queues[level] agent_task.apply_async( args=(i, f'data_{i}'), queue=queue, priority=prio ) print(f'Sent task {i} to {queue} with priority {prio}') - Запустить скрипт:
python generate_tasks.py - Наблюдать в консоли воркера получение задач.
Ожидаемый результат этапа 100 задач отправлены в соответствующие очереди. Воркеры начали их обрабатывать.
Этап 4: Мониторинг и верификация приоритетов (1 ч)
Действия
- Запустить Flower для визуального мониторинга:
celery -A tasks flower --port=5555 - Открыть http://localhost:5555 — проверить, что воркеры живы, очереди содержат задачи.
- Написать скрипт
check_results.py, который дожидается завершения всех задач и проверяет порядок выполнения:from tasks import app from celery.result import AsyncResult # Получить все ID задач из брокера (упрощённо) # В реальности нужно сохранять ID при отправке # Предположим, что мы сохранили их в файл results = [] with open('task_ids.txt') as f: for line in f: tid = line.strip() res = AsyncResult(tid, app=app) results.append((tid, res.get(timeout=5))) # Проверить, что ни одна задача не упала assert all(r[1]['status'] == 'done' for r in results) print(f'All {len(results)} tasks completed successfully') - Убедиться, что задачи из очереди
highзавершились раньше, чемlowпри прочих равных.
Ожидаемый результат этапа Flower отображает все 100 задач, их статусы. Скрипт верификации показывает 100 успешных выполнений.
Этап 5: Масштабирование и стресс-тест (30 мин)
Действия
- Увеличить количество воркеров до 8:
celery -A tasks worker --loglevel=info --concurrency=8 -Q high,medium,low - Повторно запустить генерацию 100 задач с ещё большим разбросом приоритетов.
- Замерять время выполнения всех задач с помощью Flower или логов.
- Выгрузить статистику из Flower (вкладка "Metrics").
Ожидаемый результат этапа Время обработки 100 задач сократилось пропорционально количеству воркеров. Приоритетные задачи выполняются в первую очередь.
5. Критерии приемки (Definition of Done)
- Установлен и работает Redis; Celery worker успешно стартует и подключается к Redis.
- Созданы три очереди (high, medium, low) с различными максимальными приоритетами.
- 100 задач отправлены с корректными приоритетами в соответствии с очередью.
- Все 100 задач завершены без ошибок (статус
done). - Воркеры обработали задачи из очереди high раньше, чем из low (порядок соблюдён).
- Flower отображает состояние всех воркеров и очередей, метрики доступны.
- При увеличении числа воркеров общее время обработки уменьшается (масштабирование).
- Код задачи agent_task корректно принимает и возвращает данные.
6. Ожидаемый результат
- Основной артефакт каталог с файлами:
tasks.py— конфигурация Celery и определение задачи.generate_tasks.py— скрипт отправки 100 задач.check_results.py— скрипт верификации.- requirements.txt — список зависимостей.
- README.md — инструкция по запуску.
- Содержимое описанная выше инфраструктура, способная обработать 100 задач с приоритетами.
- Дополнительные результаты скриншоты Flower, логи выполнения, файл с замером времени.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Redis недоступен после запуска | Проверить порт 6379, отключить firewall, использовать redis-cli ping |
| Celery worker не видит задач | Убедиться, что воркер подписан на правильные очереди (-Q high,medium,low) |
| Приоритеты не соблюдаются | Включить worker_prefetch_multiplier=1; убедиться, что очередь объявлена с x-max-priority |
| Проблемы с правами на запись логов | Запускать воркер из каталога с правами на запись |
| Flower не запускается | Установить отдельно pip install flower; проверить версии Celery и Flower (совместимость) |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Настройка окружения и Redis | 30 мин |
| Этап 2: Создание приложения Celery | 1 ч |
| Этап 3: Генерация 100 задач | 30 мин |
| Этап 4: Мониторинг и верификация | 1 ч |
| Этап 5: Масштабирование и стресс-тест | 30 мин |
| Итого | 3,5 ч |
Примечание: для первого раза заложите 5–6 часов с учётом отладки.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 45 | Архитектура очередей сообщений |
| 78 | Настройка приоритетов в Celery |
| 101 | Мониторинг задач через Flower |
| 132 | Масштабирование воркеров Celery |
| 204 | Обработка ошибок в Celery задачах |
| 277 | Интеграция Redis как брокера |
| 315 | Стратегии повторной отправки задач |
| 450 | Конкурентное выполнение многоагентных систем |
| 522 | Профилирование производительности очередей |
| 688 | Безопасность распределённых задач |
10. Чек-лист самопроверки
- Я корректно настроил Redis и проверил его работу через redis-cli.
- Я создал файл
tasks.pyс конфигурацией очередей и задачей-заглушкой. - Я отправил ровно 100 задач, записав их ID в файл для верификации.
- Я запустил Flower и увидел, что все 100 задач завершены.
- Я убедился, что задачи из очереди high завершились раньше, чем из low, при одинаковой загрузке.