English translation is not available yet. Showing Russian content.
Настроить peer-to-peer коммуникацию агентов
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить peer-to-peer коммуникацию агентов
1. Цель задачи
Спроектировать и реализовать механизм прямого обмена сообщениями между тремя автономными агентами с использованием общего message bus. Агенты должны координировать свои действия без центрального диспетчера — каждый агент публикует запросы и подписывается на ответы других агентов. Ключевой результат три агента выполняют согласованную последовательность операций (например, цепочку обработки данных) через peer-to-peer сообщения с гарантированной доставкой и обработкой ошибок.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Базовый код трёх агентов (Python-классы с основной логикой) | Написать самостоятельно или адаптировать шаблон из документации LangChain / AutoGen |
| Message bus (Redis Pub/Sub или RabbitMQ) | Docker-образ redis:7-alpine или rabbitmq:3-management |
| Сценарий координации (например, агент A генерирует число, агент B умножает, агент C суммирует) | Разработать согласно признаку успеха |
| Клиентская библиотека для работы с брокером сообщений | redis-py (>=5.0) или pika (>=1.3) |
Если нет реального инструмента — симулируем:
- Установите Docker Desktop (или Podman) на локальную машину.
- Запустите Redis:
docker run -d --name message-bus -p 6379:6379 redis:7-alpine. - Если Docker недоступен, используйте встроенный
asyncio.Queueи эмуляцию pub/sub через глобальный словарь — это не заменит реального брокера, но позволит отладить логику агентов.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык реализации | Python 3.11+ | Разработка агентов и инфраструктуры |
| Message bus | Redis (Pub/Sub) или RabbitMQ | Канал для peer-to-peer сообщений |
| Асинхронное выполнение | asyncio / uvloop | Параллельная работа агентов |
| Управление зависимостями | pip + requirements.txt | Изоляция окружения |
| Контейнеризация (опционально) | Docker Compose | Быстрый запуск всей системы |
| Тестирование | pytest + pytest-asyncio | Проверка координации агентов |
4. Этапы выполнения
Этап 1: Подготовка окружения и message bus (оценка времени: 30 мин)
Действия
- Создайте виртуальное окружение и установите зависимости:
python -m venv venv source venv/bin/activate pip install redis pydantic pytest pytest-asyncio - Запустите Redis (или проверьте, что он работает):
docker start message-bus # если контейнер уже создан - Напишите утилиту для подключения к Redis:
# bus.py import redis.asyncio as aioredis async def get_redis(): return await aioredis.from_url("redis://localhost:6379") - Напишите простой тест pub/sub, чтобы убедиться, что сообщения доставляются.
Ожидаемый результат этапа ✅ Работающий Redis, библиотека redis-py импортируется, успешный тест send/receive.
Этап 2: Проектирование протокола сообщений (оценка времени: 45 мин)
Действия
- Определите структуру сообщения с помощью Pydantic:
# messages.py from pydantic import BaseModel from uuid import UUID, uuid4 class Message(BaseModel): msg_id: UUID = uuid4() sender: str receiver: str msg_type: str # "request", "response", "coordinate" payload: dict timestamp: float = None - Опишите схему топиков: каждый агент публикует в топик
agent:<имя>и подписывается на топики других агентов. - Определите жизненный цикл сообщения: отправка → получение → обработка → ответ (опционально с таймаутом).
- Документируйте протокол в файле
PROTOCOL.md.
Ожидаемый результат этапа ✅ Готовая модель сообщения, спецификация топиков, описание протокола.
Этап 3: Реализация базового агента (оценка времени: 1.5 часа)
Действия
- Создайте абстрактный класс
Agent:# agent.py import asyncio import json from bus import get_redis from messages import Message class Agent: def __init__(self, name: str): self.name = name self.redis = None self.pubsub = None async def start(self): self.redis = await get_redis() self.pubsub = self.redis.pubsub() await self.pubsub.subscribe(f"agent:{self.name}") asyncio.create_task(self._listen()) async def _listen(self): async for message in self.pubsub.listen(): if message["type"] == "message": msg = Message(**json.loads(message["data"])) await self.handle_message(msg) async def send_message(self, receiver: str, msg: Message): channel = f"agent:{receiver}" await self.redis.publish(channel, msg.model_dump_json()) async def handle_message(self, msg: Message): raise NotImplementedError("Subclasses must implement handle_message") - Реализуйте трёх конкретных агентов:
GeneratorAgent— генерирует задачу и отправляет её агенту B.ProcessorAgent— обрабатывает задачу и отправляет результат агенту C.AggregatorAgent— собирает результаты и выводит итог.
- Добавьте логирование с помощью
logging(все приёмы/отправки).
Ожидаемый результат этапа ✅ Рабочие классы агентов с асинхронным прослушиванием и методом отправки.
Этап 4: Реализация координации (оценка времени: 1 час)
Действия
- Напишите сценарий координации в файле
coordination.py:- GeneratorAgent отправляет
{"task": "process", "value": 42}агенту Processor. - ProcessorAgent умножает значение на 2 и отправляет Aggregator.
- AggregatorAgent выводит итоговое значение и отправляет подтверждение Generator.
- GeneratorAgent отправляет
- Используйте
asyncio.create_taskдля запуска трёх агентов одновременно. - Добавьте таймаут на ожидание ответа (5 секунд), при превышении — повторная отправка (до 3 попыток).
- Убедитесь, что агенты успешно обменялись минимум тремя сообщениями (логи).
Ожидаемый результат этапа ✅ Запуск python coordination.py показывает цепочку: A → B → C → A.
Этап 5: Тестирование и отладка (оценка времени: 45 мин)
Действия
- Напишите асинхронный pytest для проверки координации:
# test_coordination.py import pytest from coordination import run_agents @pytest.mark.asyncio async def test_three_agents_coordinate(): result = await run_agents() assert result["final_value"] == 84 # 42 * 2 - Проверьте обработку ошибок: если один агент не отвечает, система должна корректно завершиться (graceful shutdown).
- Запустите тесты c
pytest -v --asyncio-mode=auto.
Ожидаемый результат этапа ✅ Все тесты проходят, система завершается без утечек ресурсов (закрыты соединения Redis).
5. Критерии приемки (Definition of Done)
- Все три агента запускаются параллельно и обмениваются сообщениями через Redis Pub/Sub.
- Каждое сообщение содержит уникальный ID и подтверждается приёмом (логирование).
- Реализована цепочка координации: Generator → Processor → Aggregator (минимум 3 пересылки).
- Система корректно обрабатывает таймауты (повторная отправка до 3 раз).
- Тесты (
pytest) покрывают нормальный сценарий и сценарий с ошибкой (отказ одного агента). - Код задокументирован: Readme с инструкцией по запуску, PROTOCOL.md с описанием протокола.
- После остановки программы все ресурсы (Redis-соединения) освобождены.
- Время выполнения цепочки не превышает 2 секунды.
6. Ожидаемый результат
- Основной артефакт Директория
peer_agents/со следующей структурой:peer_agents/ ├── bus.py # работа с Redis ├── messages.py # модель сообщения ├── agent.py # базовый класс агента ├── agents_impl.py # три конкретных агента ├── coordination.py # запуск координации ├── test_coordination.py # тесты ├── PROTOCOL.md # описание протокола └── README.md # инструкция по запуску - Содержание Работающий код, который при запуске
python coordination.pyвыводит логи обмена сообщениями и итоговый результат. - Дополнительные результаты
docker-compose.yml(если используется), файлrequirements.txt.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Потеря сообщений из-за отключения Redis | Включить режим Redis Persistence (RDB/AOF) или использовать RabbitMQ с подтверждениями. |
| Deadlock при синхронном ожидании | Использовать asyncio.wait_for с таймаутом и отдельные очереди для ответов. |
| Race condition при подписке | Подписываться до публикации первого сообщения (в start()). |
| Сложность отладки асинхронного кода | Включить логирование с временными метками (logging.INFO). |
| Несовместимость версий библиотек | Зафиксировать версии в requirements.txt (redis>=5.0, pydantic>=2.0). |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| 1. Подготовка окружения и message bus | 30 мин |
| 2. Проектирование протокола сообщений | 45 мин |
| 3. Реализация базового агента | 1.5 часа |
| 4. Реализация координации | 1 час |
| 5. Тестирование и отладка | 45 мин |
| Итого | 4.5 часа |
При первом выполнении задачи (незнаком с asyncio и Redis) заложите дополнительно 1–2 часа на изучение.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 124 | Протоколы межпроцессного взаимодействия |
| 145 | Асинхронное программирование в Python (asyncio) |
| 203 | Паттерн Pub/Sub в распределённых системах |
| 278 | Основы проектирования мультиагентных систем |
| 312 | Обработка ошибок и повторные попытки |
| 419 | Гарантированная доставка сообщений (Delivery Guarantees) |
| 501 | Контейнеризация приложений с Docker |
| 604 | Тестирование асинхронного кода (pytest-asyncio) |
| 722 | Принципы построения отказоустойчивых агентов |
| 889 | ZeroMQ как альтернатива Redis для сообщений |
10. Чек-лист самопроверки
- Я развернул Redis локально (или симулировал) и убедился, что он отвечает.
- Класс
Agentкорректно подписывается на свой топик и обрабатывает сообщения. - Все три агента запускаются в одном процессе с помощью
asyncio.gather. - Протокол сообщений включает idempotency key (msg_id) для защиты от дубликатов.
- Таймауты и повторные попытки реализованы и протестированы.
- Тесты покрывают не только успешный сценарий, но и случай выпадения одного агента (искусственный таймаут).
- В
README.mdописана команда запуска:docker run -d -p 6379:6379 redis:7-alpine && python coordination.py. - После завершения тестов Redis-соединения закрыты (нет warnings в логах).