English translation is not available yet. Showing Russian content.

Настроить peer-to-peer коммуникацию агентов

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить peer-to-peer коммуникацию агентов

1. Цель задачи

Спроектировать и реализовать механизм прямого обмена сообщениями между тремя автономными агентами с использованием общего message bus. Агенты должны координировать свои действия без центрального диспетчера — каждый агент публикует запросы и подписывается на ответы других агентов. Ключевой результат три агента выполняют согласованную последовательность операций (например, цепочку обработки данных) через peer-to-peer сообщения с гарантированной доставкой и обработкой ошибок.

2. Исходные данные

Что нужноОткуда взять
Базовый код трёх агентов (Python-классы с основной логикой)Написать самостоятельно или адаптировать шаблон из документации LangChain / AutoGen
Message bus (Redis Pub/Sub или RabbitMQ)Docker-образ redis:7-alpine или rabbitmq:3-management
Сценарий координации (например, агент A генерирует число, агент B умножает, агент C суммирует)Разработать согласно признаку успеха
Клиентская библиотека для работы с брокером сообщенийredis-py (>=5.0) или pika (>=1.3)

Если нет реального инструмента — симулируем:

  1. Установите Docker Desktop (или Podman) на локальную машину.
  2. Запустите Redis: docker run -d --name message-bus -p 6379:6379 redis:7-alpine.
  3. Если Docker недоступен, используйте встроенный asyncio.Queue и эмуляцию pub/sub через глобальный словарь — это не заменит реального брокера, но позволит отладить логику агентов.

3. Технологический стек

КомпонентИнструментыНазначение
Язык реализацииPython 3.11+Разработка агентов и инфраструктуры
Message busRedis (Pub/Sub) или RabbitMQКанал для peer-to-peer сообщений
Асинхронное выполнениеasyncio / uvloopПараллельная работа агентов
Управление зависимостямиpip + requirements.txtИзоляция окружения
Контейнеризация (опционально)Docker ComposeБыстрый запуск всей системы
Тестированиеpytest + pytest-asyncioПроверка координации агентов

4. Этапы выполнения

Этап 1: Подготовка окружения и message bus (оценка времени: 30 мин)

Действия

  1. Создайте виртуальное окружение и установите зависимости:
    python -m venv venv
    source venv/bin/activate
    pip install redis pydantic pytest pytest-asyncio
    
  2. Запустите Redis (или проверьте, что он работает):
    docker start message-bus   # если контейнер уже создан
    
  3. Напишите утилиту для подключения к Redis:
    # bus.py
    import redis.asyncio as aioredis
    
    async def get_redis():
        return await aioredis.from_url("redis://localhost:6379")
    
  4. Напишите простой тест pub/sub, чтобы убедиться, что сообщения доставляются.

Ожидаемый результат этапа ✅ Работающий Redis, библиотека redis-py импортируется, успешный тест send/receive.

Этап 2: Проектирование протокола сообщений (оценка времени: 45 мин)

Действия

  1. Определите структуру сообщения с помощью Pydantic:
    # messages.py
    from pydantic import BaseModel
    from uuid import UUID, uuid4
    
    class Message(BaseModel):
        msg_id: UUID = uuid4()
        sender: str
        receiver: str
        msg_type: str  # "request", "response", "coordinate"
        payload: dict
        timestamp: float = None
    
  2. Опишите схему топиков: каждый агент публикует в топик agent:<имя> и подписывается на топики других агентов.
  3. Определите жизненный цикл сообщения: отправка → получение → обработка → ответ (опционально с таймаутом).
  4. Документируйте протокол в файле PROTOCOL.md.

Ожидаемый результат этапа ✅ Готовая модель сообщения, спецификация топиков, описание протокола.

Этап 3: Реализация базового агента (оценка времени: 1.5 часа)

Действия

  1. Создайте абстрактный класс Agent:
    # agent.py
    import asyncio
    import json
    from bus import get_redis
    from messages import Message
    
    class Agent:
        def __init__(self, name: str):
            self.name = name
            self.redis = None
            self.pubsub = None
    
        async def start(self):
            self.redis = await get_redis()
            self.pubsub = self.redis.pubsub()
            await self.pubsub.subscribe(f"agent:{self.name}")
            asyncio.create_task(self._listen())
    
        async def _listen(self):
            async for message in self.pubsub.listen():
                if message["type"] == "message":
                    msg = Message(**json.loads(message["data"]))
                    await self.handle_message(msg)
    
        async def send_message(self, receiver: str, msg: Message):
            channel = f"agent:{receiver}"
            await self.redis.publish(channel, msg.model_dump_json())
    
        async def handle_message(self, msg: Message):
            raise NotImplementedError("Subclasses must implement handle_message")
    
  2. Реализуйте трёх конкретных агентов:
    • GeneratorAgent — генерирует задачу и отправляет её агенту B.
    • ProcessorAgent — обрабатывает задачу и отправляет результат агенту C.
    • AggregatorAgent — собирает результаты и выводит итог.
  3. Добавьте логирование с помощью logging (все приёмы/отправки).

Ожидаемый результат этапа ✅ Рабочие классы агентов с асинхронным прослушиванием и методом отправки.

Этап 4: Реализация координации (оценка времени: 1 час)

Действия

  1. Напишите сценарий координации в файле coordination.py:
    • GeneratorAgent отправляет {"task": "process", "value": 42} агенту Processor.
    • ProcessorAgent умножает значение на 2 и отправляет Aggregator.
    • AggregatorAgent выводит итоговое значение и отправляет подтверждение Generator.
  2. Используйте asyncio.create_task для запуска трёх агентов одновременно.
  3. Добавьте таймаут на ожидание ответа (5 секунд), при превышении — повторная отправка (до 3 попыток).
  4. Убедитесь, что агенты успешно обменялись минимум тремя сообщениями (логи).

Ожидаемый результат этапа ✅ Запуск python coordination.py показывает цепочку: A → B → C → A.

Этап 5: Тестирование и отладка (оценка времени: 45 мин)

Действия

  1. Напишите асинхронный pytest для проверки координации:
    # test_coordination.py
    import pytest
    from coordination import run_agents
    
    @pytest.mark.asyncio
    async def test_three_agents_coordinate():
        result = await run_agents()
        assert result["final_value"] == 84  # 42 * 2
    
  2. Проверьте обработку ошибок: если один агент не отвечает, система должна корректно завершиться (graceful shutdown).
  3. Запустите тесты c pytest -v --asyncio-mode=auto.

Ожидаемый результат этапа ✅ Все тесты проходят, система завершается без утечек ресурсов (закрыты соединения Redis).

5. Критерии приемки (Definition of Done)

  • Все три агента запускаются параллельно и обмениваются сообщениями через Redis Pub/Sub.
  • Каждое сообщение содержит уникальный ID и подтверждается приёмом (логирование).
  • Реализована цепочка координации: Generator → Processor → Aggregator (минимум 3 пересылки).
  • Система корректно обрабатывает таймауты (повторная отправка до 3 раз).
  • Тесты (pytest) покрывают нормальный сценарий и сценарий с ошибкой (отказ одного агента).
  • Код задокументирован: Readme с инструкцией по запуску, PROTOCOL.md с описанием протокола.
  • После остановки программы все ресурсы (Redis-соединения) освобождены.
  • Время выполнения цепочки не превышает 2 секунды.

6. Ожидаемый результат

  • Основной артефакт Директория peer_agents/ со следующей структурой:
    peer_agents/
    ├── bus.py              # работа с Redis
    ├── messages.py         # модель сообщения
    ├── agent.py            # базовый класс агента
    ├── agents_impl.py      # три конкретных агента
    ├── coordination.py     # запуск координации
    ├── test_coordination.py # тесты
    ├── PROTOCOL.md         # описание протокола
    └── README.md           # инструкция по запуску
    
  • Содержание Работающий код, который при запуске python coordination.py выводит логи обмена сообщениями и итоговый результат.
  • Дополнительные результаты docker-compose.yml (если используется), файл requirements.txt.

7. Возможные сложности и их решение

СложностьРешение
Потеря сообщений из-за отключения RedisВключить режим Redis Persistence (RDB/AOF) или использовать RabbitMQ с подтверждениями.
Deadlock при синхронном ожиданииИспользовать asyncio.wait_for с таймаутом и отдельные очереди для ответов.
Race condition при подпискеПодписываться до публикации первого сообщения (в start()).
Сложность отладки асинхронного кодаВключить логирование с временными метками (logging.INFO).
Несовместимость версий библиотекЗафиксировать версии в requirements.txt (redis>=5.0, pydantic>=2.0).

8. Бюджет времени (оценка)

ЭтапВремя
1. Подготовка окружения и message bus30 мин
2. Проектирование протокола сообщений45 мин
3. Реализация базового агента1.5 часа
4. Реализация координации1 час
5. Тестирование и отладка45 мин
Итого4.5 часа

При первом выполнении задачи (незнаком с asyncio и Redis) заложите дополнительно 1–2 часа на изучение.

9. Связанные вопросы из базы знаний

ВопросТема
124Протоколы межпроцессного взаимодействия
145Асинхронное программирование в Python (asyncio)
203Паттерн Pub/Sub в распределённых системах
278Основы проектирования мультиагентных систем
312Обработка ошибок и повторные попытки
419Гарантированная доставка сообщений (Delivery Guarantees)
501Контейнеризация приложений с Docker
604Тестирование асинхронного кода (pytest-asyncio)
722Принципы построения отказоустойчивых агентов
889ZeroMQ как альтернатива Redis для сообщений

10. Чек-лист самопроверки

  • Я развернул Redis локально (или симулировал) и убедился, что он отвечает.
  • Класс Agent корректно подписывается на свой топик и обрабатывает сообщения.
  • Все три агента запускаются в одном процессе с помощью asyncio.gather.
  • Протокол сообщений включает idempotency key (msg_id) для защиты от дубликатов.
  • Таймауты и повторные попытки реализованы и протестированы.
  • Тесты покрывают не только успешный сценарий, но и случай выпадения одного агента (искусственный таймаут).
  • В README.md описана команда запуска: docker run -d -p 6379:6379 redis:7-alpine && python coordination.py.
  • После завершения тестов Redis-соединения закрыты (нет warnings в логах).