Реализация паттернов request-response и fire-and-forget для меж-агентской коммуникации

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализация паттернов request-response и fire-and-forget для меж-агентской коммуникации

1. Цель задачи

Освоить два фундаментальных паттерна асинхронного обмена сообщениями между ИИ-агентами: синхронный request-response (ожидание ответа) и асинхронный fire-and-forget (отправил и забыл). Научиться обоснованно выбирать паттерн в зависимости от типа задачи (критичный запрос vs фоновое логирование). Закрепить навык на практике, реализовав fire-and-forget для отправки логов в центральный сервис.

Ключевой результат Работающий прототип системы из двух агентов, где один отправляет другому как синхронные запросы, так и асинхронные уведомления (логи) через единый брокер сообщений, с выбором паттерна в коде.


2. Исходные данные

Перед началом необходимо подготовить:

Что нужноОткуда взять
Рабочее окружение Python 3.10+Локальная установка или Docker
Брокер сообщений (RabbitMQ или Redis Pub/Sub)Docker-образ (rabbitmq:3-management или redis:7)
Базовая структура двух агентов (классы или модули)Создать в процессе выполнения
Примеры задач для выбора паттерна: «получить прогноз погоды» (request-response) и «залогировать событие» (fire-and-forget)Придумать самостоятельно, можно взять из демо

Если нет реального брокера — симулируем:

  1. Устанавливаем Docker Desktop (или Colima на macOS).
  2. Запускаем контейнер RabbitMQ: docker run -d --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
  3. Проверяем доступность через curl -u guest:guest http://localhost:15672/api/overview
  4. Альтернатива — использовать Redis с asyncio и redis-py без докера, если Redis установлен локально.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Реализация агентов и коммуникации
Асинхронное выполнениеasyncio, aio-pika (для RabbitMQ)Неблокирующий обмен сообщениями
Брокер сообщенийRabbitMQ (или Redis)Маршрутизация сообщений между агентами
Логированиеstructlog + файл/stdoutДемонстрация fire-and-forget логов
Тестированиеpytest + pytest-asyncioПроверка корректности паттернов
ДокументацияMarkdown, READMEОписание архитектуры и выбора паттерна

4. Этапы выполнения

Этап 1: Проектирование архитектуры и подготовка окружения (1 час)

Действия

  1. Нарисовать схему коммуникации

    • Агент A (клиент) → Брокер → Агент B (сервер).
    • Для request-response: Агент A посылает запрос в очередь rpc_queue, Агент B обрабатывает и отправляет ответ в динамическую очередь reply_to.
    • Для fire-and-forget: Агент A отправляет сообщение в очередь logs, Агент B (логер) просто потребляет и не возвращает ответ.
  2. Создать структуру проекта

    agent_comm/
    ├── agents/
    │   ├── agent_a.py         # клиент
    │   └── agent_b.py         # сервер/логер
    ├── comm/
    │   ├── __init__.py
    │   ├── broker.py          # подключение к RabbitMQ
    │   ├── request_response.py
    │   └── fire_and_forget.py
    ├── logs/                  # директория для логов
    ├── tests/
    │   └── test_comm.py
    ├── docker-compose.yml     # для RabbitMQ
    ├── requirements.txt
    └── README.md
    
  3. Установить зависимости

    pip install aio-pika structlog pytest pytest-asyncio
    
  4. Запустить брокер (если не запущен):

    docker-compose up -d rabbitmq
    

Ожидаемый результат этапа Готовая структура проекта, работающий RabbitMQ, установленные зависимости.


Этап 2: Реализация паттерна request-response (1,5 часа)

Действия

  1. Написать модуль request_response.py

    • Функция send_request(loop, queue_name, message):
      • Создаёт временную очередь reply_to.
      • Публикует сообщение с correlation_id и reply_to.
      • Ожидает ответ по reply_to (таймаут 5 секунд).
    • Функция start_rpc_server(queue_name, callback):
      • Потребляет из queue_name.
      • Для каждого сообщения вызывает callback и отправляет результат в reply_to.
  2. Реализовать агента B (agent_b.py) с RPC-обработчиком:

    • Пример: handle_weather_request(city) → возвращает симулированный прогноз.
  3. Реализовать агента A (agent_a.py) с вызовом:

    • Запрос прогноза через send_request.
    • Вывод ответа.
  4. Протестировать end-to-end

    # agents/agent_a.py (фрагмент)
    async def main():
        response = await send_request('weather_queue', 'London')
        print(f'Weather: {response}')
    

Ожидаемый результат этапа Работающий обмен по схеме request-response: клиент получает ответ от сервера.


Этап 3: Реализация паттерна fire-and-forget (1 час)

Действия

  1. Написать модуль fire_and_forget.py

    • Функция publish_event(queue_name, message):
      • Публикует сообщение в указанную очередь.
      • Не ожидает ответа, не ждёт подтверждения.
    • Функция start_log_consumer(queue_name):
      • Потребляет сообщения и записывает их в файловый лог (structlog) в директорию logs/.
  2. Модифицировать агента B (agent_b.py):

    • Добавить возможность работы в режиме логера (потребление из logs_queue).
    • В логер сохранять timestamp и тело сообщения.
  3. Расширить агента A (agent_a.py):

    • После RPC-запроса отправлять fire-and-forget лог-событие «Выполнен RPC запрос к weather_queue».
  4. Проверить, что лог пишется без блокировки основного потока:

    • Запустить агента A, убедиться, что вывод RPC появляется мгновенно, а лог пишется асинхронно.

Ожидаемый результат этапа Логи отправляются fire-and-forget и записываются в файл без ожидания подтверждения.


Этап 4: Интеграция выбора паттерна по типу задачи (1 час)

Действия

  1. Определить тип задач создать словарь или enum TaskType:

    class TaskType(Enum):
        SYNCHRONOUS = 'request_response'
        ASYNCHRONOUS = 'fire_and_forget'
    
  2. Написать диспетчер comm/selector.py

    • Функция send_message(task_type, queue, payload):
      • Если SYNCHRONOUS → вызывает send_request.
      • Если ASYNCHRONOUS → вызывает publish_event.
  3. Добавить в агента A метод process_task(task_type, ... ):

  4. Покрыть оба сценария в __main__

    await send_message(TaskType.SYNCHRONOUS, 'weather', 'Moscow')
    await send_message(TaskType.ASYNCHRONOUS, 'logs', {'event': 'weather_request'})
    

Ожидаемый результат этапа Единая точка входа, которая по типу задачи выбирает корректный паттерн.


Этап 5: Тестирование и документирование (1 час)

Действия

  1. Написать тесты в tests/test_comm.py

    • Test request-response: отправляем запрос, проверяем что ответ пришёл.
    • Test fire-and-forget: отправляем лог, проверяем что он появился в файле (с задержкой).
    • Test выбор паттерна: мокаем брокер, проверяем вызов нужной функции.
    • Test обработка таймаута в request-response.
    • Test параллельный запуск RPC + логов.
  2. Написать README.md

    • Описание архитектуры.
    • Таблица выбора паттерна (синхронные запросы → request-response, логи → fire-and-forget).
    • Инструкция по запуску.
    • Пример использования.
  3. Задокументировать решения

    • Почему для логов выбран fire-and-forget: гарантии доставки не критичны, производительность важнее.

Ожидаемый результат этапа Покрытие тестами > 80%, понятная документация.


5. Критерии приемки (Definition of Done)

  • Реализованы оба паттерна: request-response и fire-and-forget.
  • Request-response корректно ожидает ответ и обрабатывает таймаут.
  • Fire-and-forget не блокирует отправителя и не требует ответа.
  • Выбор паттерна происходит автоматически по типу задачи (синхронная / асинхронная).
  • Логи успешно записываются в файловую систему через fire-and-forget.
  • Проект запускается в Docker-окружении с одной командой docker-compose up.
  • Написаны минимум 3 теста (один на request-response, один на fire-and-forget, один на выбор).
  • README содержит диаграмму коммуникации и примеры запуска.
  • Код соответствует стандарту PEP 8, типы аннотированы.
  • Все тесты проходят (pytest) без ошибок.

6. Ожидаемый результат

Готовый проект agent_comm в виде папки со следующими артефактами:

  • Исходный код агентов (agent_a.py, agent_b.py).
  • Модули паттернов (request_response.py, fire_and_forget.py, selector.py).
  • Тесты (tests/test_comm.py).
  • docker-compose.yml для RabbitMQ.
  • requirements.txt.
  • README.md с описанием.

Содержание README (минимальное):

  • Краткое описание.
  • Архитектурная схема (ASCII или ссылка на diagram).
  • Инструкция по запуску: docker-compose up + python agent_a.py.
  • Пример вывода.
  • Таблица выбора паттерна.

Дополнительные результаты (опционально):

  • GUI или CLI для демонстрации.
  • Интеграция с реальным лог-агрегатором (например, Loki).
  • Graceful shutdown.

7. Возможные сложности и их решение

СложностьРешение
RabbitMQ не запускается в DockerПроверить порты (5672, 15672), использовать docker logs rabbitmq. Альтернатива: Redis Pub/Sub с redis-py
Таймаут в request-response при высокой нагрузкеУвеличить таймаут, добавить повторные попытки с exponential backoff
Потеря логов при отключении брокераДобавить локальный буфер (list) и повторную отправку при восстановлении соединения
Блокировка event loop из-за долгой обработки RPCРазделить агента на два процесса или использовать пул потоков для callback
Выбор паттерна в коде становится запутаннымВвести строгую типизацию через TaskType и один диспетчер

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Проектирование и окружение1 час
Этап 2: Request-response1,5 часа
Этап 3: Fire-and-forget1 час
Этап 4: Выбор паттерна1 час
Этап 5: Тестирование и документация1 час
Итого5,5 часов

Примечание для первого раза Если работаете с асинхронным кодом и RabbitMQ впервые, заложите дополнительно 1-2 часа на отладку подключения и понимание aio-pika.


9. Связанные вопросы из базы знаний

ВопросТема
15Что такое event-driven архитектура?
22Синхронная и асинхронная коммуникация в микросервисах
34Message broker: RabbitMQ vs Kafka vs Redis
41Паттерны RPC и очереди сообщений
58Fire-and-forget vs request-response: когда применять
73Обработка ошибок в асинхронных системах
89Асинхронное логирование в production
104Гарантии доставки (at most once, at least once, exactly once)
128Тестирование асинхронного кода на Python
155Structlog: продвинутое логирование для AI-агентов

10. Чек-лист самопроверки

  • Я запустил RabbitMQ в Docker и проверил консоль управления.
  • Я написал request-response и убедился, что ответ приходит с правильным correlation_id.
  • Я реализовал fire-and-forget и вижу, что лог-файл появляется, но основной поток не блокируется.
  • Я добавил выбор паттерна по типу задачи и протестировал оба сценария.
  • Я написал хотя бы один тест на каждый паттерн и запустил pytest — все зелёные.