Реализация паттернов request-response и fire-and-forget для меж-агентской коммуникации
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализация паттернов request-response и fire-and-forget для меж-агентской коммуникации
1. Цель задачи
Освоить два фундаментальных паттерна асинхронного обмена сообщениями между ИИ-агентами: синхронный request-response (ожидание ответа) и асинхронный fire-and-forget (отправил и забыл). Научиться обоснованно выбирать паттерн в зависимости от типа задачи (критичный запрос vs фоновое логирование). Закрепить навык на практике, реализовав fire-and-forget для отправки логов в центральный сервис.
Ключевой результат Работающий прототип системы из двух агентов, где один отправляет другому как синхронные запросы, так и асинхронные уведомления (логи) через единый брокер сообщений, с выбором паттерна в коде.
2. Исходные данные
Перед началом необходимо подготовить:
| Что нужно | Откуда взять |
|---|---|
| Рабочее окружение Python 3.10+ | Локальная установка или Docker |
| Брокер сообщений (RabbitMQ или Redis Pub/Sub) | Docker-образ (rabbitmq:3-management или redis:7) |
| Базовая структура двух агентов (классы или модули) | Создать в процессе выполнения |
| Примеры задач для выбора паттерна: «получить прогноз погоды» (request-response) и «залогировать событие» (fire-and-forget) | Придумать самостоятельно, можно взять из демо |
Если нет реального брокера — симулируем:
- Устанавливаем Docker Desktop (или Colima на macOS).
- Запускаем контейнер RabbitMQ: docker run -d --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- Проверяем доступность через curl -u guest:guest http://localhost:15672/api/overview
- Альтернатива — использовать Redis с asyncio и redis-py без докера, если Redis установлен локально.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.10+ | Реализация агентов и коммуникации |
| Асинхронное выполнение | asyncio, aio-pika (для RabbitMQ) | Неблокирующий обмен сообщениями |
| Брокер сообщений | RabbitMQ (или Redis) | Маршрутизация сообщений между агентами |
| Логирование | structlog + файл/stdout | Демонстрация fire-and-forget логов |
| Тестирование | pytest + pytest-asyncio | Проверка корректности паттернов |
| Документация | Markdown, README | Описание архитектуры и выбора паттерна |
4. Этапы выполнения
Этап 1: Проектирование архитектуры и подготовка окружения (1 час)
Действия
-
Нарисовать схему коммуникации
- Агент A (клиент) → Брокер → Агент B (сервер).
- Для request-response: Агент A посылает запрос в очередь rpc_queue, Агент B обрабатывает и отправляет ответ в динамическую очередь reply_to.
- Для fire-and-forget: Агент A отправляет сообщение в очередь logs, Агент B (логер) просто потребляет и не возвращает ответ.
-
Создать структуру проекта
agent_comm/ ├── agents/ │ ├── agent_a.py # клиент │ └── agent_b.py # сервер/логер ├── comm/ │ ├── __init__.py │ ├── broker.py # подключение к RabbitMQ │ ├── request_response.py │ └── fire_and_forget.py ├── logs/ # директория для логов ├── tests/ │ └── test_comm.py ├── docker-compose.yml # для RabbitMQ ├── requirements.txt └── README.md -
Установить зависимости
pip install aio-pika structlog pytest pytest-asyncio -
Запустить брокер (если не запущен):
docker-compose up -d rabbitmq
Ожидаемый результат этапа Готовая структура проекта, работающий RabbitMQ, установленные зависимости.
Этап 2: Реализация паттерна request-response (1,5 часа)
Действия
-
Написать модуль
request_response.py- Функция send_request(loop, queue_name, message):
- Создаёт временную очередь reply_to.
- Публикует сообщение с correlation_id и reply_to.
- Ожидает ответ по reply_to (таймаут 5 секунд).
- Функция start_rpc_server(queue_name, callback):
- Потребляет из
queue_name. - Для каждого сообщения вызывает callback и отправляет результат в reply_to.
- Потребляет из
- Функция send_request(loop, queue_name, message):
-
Реализовать агента B (
agent_b.py) с RPC-обработчиком:- Пример:
handle_weather_request(city)→ возвращает симулированный прогноз.
- Пример:
-
Реализовать агента A (
agent_a.py) с вызовом:- Запрос прогноза через
send_request. - Вывод ответа.
- Запрос прогноза через
-
Протестировать end-to-end
# agents/agent_a.py (фрагмент) async def main(): response = await send_request('weather_queue', 'London') print(f'Weather: {response}')
Ожидаемый результат этапа Работающий обмен по схеме request-response: клиент получает ответ от сервера.
Этап 3: Реализация паттерна fire-and-forget (1 час)
Действия
-
Написать модуль
fire_and_forget.py- Функция
publish_event(queue_name, message):- Публикует сообщение в указанную очередь.
- Не ожидает ответа, не ждёт подтверждения.
- Функция
start_log_consumer(queue_name):- Потребляет сообщения и записывает их в файловый лог (structlog) в директорию
logs/.
- Потребляет сообщения и записывает их в файловый лог (structlog) в директорию
- Функция
-
Модифицировать агента B (
agent_b.py):- Добавить возможность работы в режиме логера (потребление из
logs_queue). - В логер сохранять timestamp и тело сообщения.
- Добавить возможность работы в режиме логера (потребление из
-
Расширить агента A (
agent_a.py):- После RPC-запроса отправлять fire-and-forget лог-событие «Выполнен RPC запрос к weather_queue».
-
Проверить, что лог пишется без блокировки основного потока:
- Запустить агента A, убедиться, что вывод RPC появляется мгновенно, а лог пишется асинхронно.
Ожидаемый результат этапа Логи отправляются fire-and-forget и записываются в файл без ожидания подтверждения.
Этап 4: Интеграция выбора паттерна по типу задачи (1 час)
Действия
-
Определить тип задач создать словарь или enum
TaskType:class TaskType(Enum): SYNCHRONOUS = 'request_response' ASYNCHRONOUS = 'fire_and_forget' -
Написать диспетчер
comm/selector.py- Функция
send_message(task_type, queue, payload):- Если
SYNCHRONOUS→ вызываетsend_request. - Если
ASYNCHRONOUS→ вызываетpublish_event.
- Если
- Функция
-
Добавить в агента A метод
process_task(task_type, ... ):- Пример: если задача «получить данные» — request-response; если «залогировать» — fire-and-forget.
-
Покрыть оба сценария в
__main__await send_message(TaskType.SYNCHRONOUS, 'weather', 'Moscow') await send_message(TaskType.ASYNCHRONOUS, 'logs', {'event': 'weather_request'})
Ожидаемый результат этапа Единая точка входа, которая по типу задачи выбирает корректный паттерн.
Этап 5: Тестирование и документирование (1 час)
Действия
-
Написать тесты в
tests/test_comm.py- Test request-response: отправляем запрос, проверяем что ответ пришёл.
- Test fire-and-forget: отправляем лог, проверяем что он появился в файле (с задержкой).
- Test выбор паттерна: мокаем брокер, проверяем вызов нужной функции.
- Test обработка таймаута в request-response.
- Test параллельный запуск RPC + логов.
-
Написать
README.md- Описание архитектуры.
- Таблица выбора паттерна (синхронные запросы → request-response, логи → fire-and-forget).
- Инструкция по запуску.
- Пример использования.
-
Задокументировать решения
- Почему для логов выбран fire-and-forget: гарантии доставки не критичны, производительность важнее.
Ожидаемый результат этапа Покрытие тестами > 80%, понятная документация.
5. Критерии приемки (Definition of Done)
- Реализованы оба паттерна: request-response и fire-and-forget.
- Request-response корректно ожидает ответ и обрабатывает таймаут.
- Fire-and-forget не блокирует отправителя и не требует ответа.
- Выбор паттерна происходит автоматически по типу задачи (синхронная / асинхронная).
- Логи успешно записываются в файловую систему через fire-and-forget.
- Проект запускается в Docker-окружении с одной командой
docker-compose up. - Написаны минимум 3 теста (один на request-response, один на fire-and-forget, один на выбор).
- README содержит диаграмму коммуникации и примеры запуска.
- Код соответствует стандарту PEP 8, типы аннотированы.
- Все тесты проходят (pytest) без ошибок.
6. Ожидаемый результат
Готовый проект agent_comm в виде папки со следующими артефактами:
- Исходный код агентов (
agent_a.py,agent_b.py). - Модули паттернов (
request_response.py,fire_and_forget.py,selector.py). - Тесты (
tests/test_comm.py). docker-compose.ymlдля RabbitMQ.requirements.txt.README.mdс описанием.
Содержание README (минимальное):
- Краткое описание.
- Архитектурная схема (ASCII или ссылка на diagram).
- Инструкция по запуску:
docker-compose up+python agent_a.py. - Пример вывода.
- Таблица выбора паттерна.
Дополнительные результаты (опционально):
- GUI или CLI для демонстрации.
- Интеграция с реальным лог-агрегатором (например, Loki).
- Graceful shutdown.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| RabbitMQ не запускается в Docker | Проверить порты (5672, 15672), использовать docker logs rabbitmq. Альтернатива: Redis Pub/Sub с redis-py |
| Таймаут в request-response при высокой нагрузке | Увеличить таймаут, добавить повторные попытки с exponential backoff |
| Потеря логов при отключении брокера | Добавить локальный буфер (list) и повторную отправку при восстановлении соединения |
| Блокировка event loop из-за долгой обработки RPC | Разделить агента на два процесса или использовать пул потоков для callback |
| Выбор паттерна в коде становится запутанным | Ввести строгую типизацию через TaskType и один диспетчер |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Проектирование и окружение | 1 час |
| Этап 2: Request-response | 1,5 часа |
| Этап 3: Fire-and-forget | 1 час |
| Этап 4: Выбор паттерна | 1 час |
| Этап 5: Тестирование и документация | 1 час |
| Итого | 5,5 часов |
Примечание для первого раза Если работаете с асинхронным кодом и RabbitMQ впервые, заложите дополнительно 1-2 часа на отладку подключения и понимание aio-pika.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 15 | Что такое event-driven архитектура? |
| 22 | Синхронная и асинхронная коммуникация в микросервисах |
| 34 | Message broker: RabbitMQ vs Kafka vs Redis |
| 41 | Паттерны RPC и очереди сообщений |
| 58 | Fire-and-forget vs request-response: когда применять |
| 73 | Обработка ошибок в асинхронных системах |
| 89 | Асинхронное логирование в production |
| 104 | Гарантии доставки (at most once, at least once, exactly once) |
| 128 | Тестирование асинхронного кода на Python |
| 155 | Structlog: продвинутое логирование для AI-агентов |
10. Чек-лист самопроверки
- Я запустил RabbitMQ в Docker и проверил консоль управления.
- Я написал request-response и убедился, что ответ приходит с правильным correlation_id.
- Я реализовал fire-and-forget и вижу, что лог-файл появляется, но основной поток не блокируется.
- Я добавил выбор паттерна по типу задачи и протестировал оба сценария.
- Я написал хотя бы один тест на каждый паттерн и запустил
pytest— все зелёные.