Как проектировать request-response vs fire-and-forget для агентов?
теги: [разбор, агенты, архитектура] вопрос: 818
Краткий тезис
Выбор стиля коммуникации между агентами или между клиентом и агентом в Agentic RAG определяется балансом между синхронностью, надёжностью и задержкой. Request-response (синхронный запрос-ответ) применяется, когда нужен немедленный результат. Fire-and-forget (отправил и забыл) — для асинхронных уведомлений, логирования или запуска длительных задач. Промежуточные варианты — async response (с callback/webhook) и streaming (непрерывный поток) — позволяют сочетать достоинства обоих подходов. Проектирование включает выбор протокола (HTTP, gRPC, очереди сообщений), обработку таймаутов, повторные попытки и мониторинг очередей.
1. Термины и контекст
Agentic RAG — архитектура, где агенты (автономные модули с LLM) координируют выполнение подзадач: поиск документов, генерация ответов, вызов внешних инструментов. Коммуникация между агентами и с клиентом осуществляется через сообщения.
Стили коммуникации
- Request-response (синхронный) — отправитель посылает запрос и блокируется до получения ответа.
- Fire-and-forget (асинхронный без ответа) — отправитель отправляет сообщение и продолжает работу, не ожидая ответа.
- Async-response (асинхронный с ответом) — отправитель отправляет запрос и получает ответ позже через callback, webhook или опрос.
- Streaming — непрерывная двунаправленная передача данных (Server-Sent Events, WebSocket).
Термин «агент» — программный модуль, который на основе LLM принимает решения, вызывает инструменты и поддерживает контекст диалога.
2. Request-response (синхронный паттерн)
Как работает
Клиент (или другой агент) отправляет запрос и ждёт ответа. Обычно реализуется через HTTP REST, gRPC или внутрипроцессный вызов.
Пример (HTTP)
import httpx
response = httpx.post(
"http://agent-api/rag-query",
json={"query": "как оценить RAG?"},
timeout=10.0
)
result = response.json()
print(result)
Когда использовать
- Нужен немедленный результат (например, ответ пользователю в чате).
- Операция выполняется быстро (до нескольких секунд).
- Надёжность обеспечивается повторными попытками (retry) и таймаутами.
Проблемы
- Блокировка — отправитель занят ожиданием.
- Каскадные таймауты — если цепочка вызовов длинная, общая задержка суммируется.
- Сложность с отменой — если ответ не нужен, его трудно отозвать.
Проектирование request-response в агентной системе
- Устанавливаем таймаут (обычно 5-30 секунд).
- Добавляем retry policy (экспоненциальная задержка, jitter).
- Используем circuit breaker для защиты от перегрузки.
- В HTTP — статус-коды: 200 (успех), 4xx (ошибка клиента), 5xx (ошибка сервера).
Сравнение протоколов
| Протокол | Преимущества | Недостатки |
|---|---|---|
| HTTP REST | Простота, широкая поддержка | Большой overhead, отсутствие потоковой передачи |
| gRPC | Высокая скорость, streaming, codegen | Сложнее в настройке |
| RabbitMQ / Kafka (RPC) | Асинхронность, надёжность | Нужен брокер, latency выше |
3. Fire-and-forget (отправил и забыл)
Как работает
Отправитель публикует сообщение в очередь или топик и немедленно продолжает работу. Получатель обрабатывает сообщение асинхронно. Ответ не ожидается, но может быть отправлен отдельно (тогда это async-response).
Пример (через очередь)
import aioredis
redis = await aioredis.from_url("redis://localhost")
await redis.publish("agent-events", "task:index_document")
# Не ждём подтверждения обработки
Когда использовать
- Уведомления (логи, метрики, обновление кэша).
- Запуск длительных фоновых задач (индексация, суммаризация).
- Сценарии, где потеря сообщения допустима (если нет гарантии доставки).
Проблемы
- Нет обратной связи — отправитель не знает, выполнилась ли задача успешно.
- Отсутствие гарантий — если брокер падает, сообщение может потеряться.
- Сложность отладки — сложно трассировать цепочку вызовов.
Проектирование fire-and-forget
- Используем персистентные очереди (Kafka, RabbitMQ) с подтверждением записи — сообщение не потеряется, даже если потребитель временно недоступен.
- Для критичных сообщений добавляем DLQ (Dead Letter Queue) для ошибочных.
- Включаем мониторинг глубины очереди и скорости потребления.
Пример архитектуры
[Агент-отправитель] → (запись в очередь) → [Брокер (Kafka)] → [Агент-обработчик]
4. Async-response (fire-and-forget + callback)
Как работает
Отправитель отправляет запрос (как fire-and-forget), но ожидает ответ асинхронно через механизм обратного вызова:
- Callback URL — обработчик после завершения отправляет результат на указанный endpoint.
- Webhook — аналогично, но инициируется системой.
- Polling — отправитель периодически опрашивает статус задачи.
- Message queue reply — ответ публикуется в очередь, уникальный идентификатор связывает запрос и ответ.
Пример (callback)
# запрос с callback_url
requests.post(
"http://long-running-agent/process",
json={"task": "summarize_docs", "callback_url": "http://caller-webhook/reply"}
)
Когда использовать
- Длинные операции (больше нескольких секунд).
- Нельзя ждать ответа синхронно (например, агент должен обрабатывать множество запросов).
- Требуется гарантированная доставка ответа.
Плюсы/минусы
| Аспект | Плюсы | Минусы |
|---|---|---|
| Загрузка отправителя | Не блокируется | Нужно обслуживать callback endpoint |
| Надёжность | Можно реализовать exactly-once | Сложнее в отладке |
| Latency | Минимальная для отправителя | Общая задержка может быть больше |
5. Streaming (непрерывный поток)
Как работает
Данные передаются непрерывно в одном или обоих направлениях. В агентных системах часто используется для:
- Server-Sent Events (SSE) — сервер отправляет события клиенту.
- WebSocket — двунаправленный канал.
Пример SSE
GET /agent-stream?query=что такое RAG HTTP/1.1
Accept: text/event-stream
response:
data: {"type": "chunk", "text": "RAG это..."}
data: {"type": "final", "text": "RAG это Retrieval-Augmented Generation."}
Когда использовать
- Агент генерирует ответ по частям (streaming токенов от LLM).
- Промежуточные результаты (поиск, вызов инструментов) визуализируются.
- Двунаправленная коммуникация (WebSocket позволяет клиенту прервать генерацию).
Проектирование
- Используем
async generatorsв Python для чтения ответа. - Добавляем идентификаторы сообщений (message_id) для восстановления порядка.
- Устанавливаем таймаут на бездействие (idle timeout).
6. Как выбирать стиль коммуникации
Факторы выбора
- Latency (задержка): request-response медленнее, fire-and-forget быстрее.
- Надёжность (reliability): request-response проще сделать надёжным (retry, таймаут), fire-and-forget требует механизмов гарантий.
- Сложность реализации: request-response проще, streaming и async-response сложнее.
- Необходимость обратной связи: если нужен результат → request-response или async-response.
- Время выполнения операции: короткие (<1с) → request-response; длинные → async-response или fire-and-forget (если результат не нужен).
- Пропускная способность: fire-and-forget и streaming лучше для high throughput.
Таблица принятия решений
| Сценарий | Рекомендуемый стиль | Комментарий |
|---|---|---|
| Пользовательский чат-бот (быстрый ответ) | Request-response (или streaming) | Нужен мгновенный ответ |
| Индексация нового документа (долгая) | Fire-and-forget (с логированием) | Результат не нужен сразу |
| Запрос к внешнему API (несколько секунд) | Async-response (callback) | Не блокировать агента |
| Агент-планировщик делегирует задачу | Async-response (через очередь) | Для координации |
| Пошаговое объяснение от LLM | Streaming (SSE или WebSocket) | Показывать токены по мере генерации |
Реализация компромиссов
Пример: гибридный подход
from asyncio import create_task, get_event_loop
async def handle_request(query):
# 1. Быстрый поиск (request-response)
docs = await search_docs(query) # блокирует, но быстро
# 2. Запускаем долгую суммаризацию (fire-and-forget с результатом в очередь)
task_id = await dispatch_summarization(docs) # немедленно возвращает task_id
# 3. Потоковая отправка промежуточного ответа
for token in generate_intermediate(query, docs):
yield token
# 4. Позже получаем callback с полным ответом
final = await wait_for_result(task_id, timeout=60)
yield final
7. Реализация на Python с asyncio
Заготовка для request-response
import aiohttp
async def fetch_agent_response(session, url, payload, timeout=10):
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
if resp.status == 200:
return await resp.json()
raise Exception(f"Agent error: {resp.status}")
Заготовка для fire-and-forget через asyncio.create_task
import asyncio
async def send_and_forget(coroutine):
"""Запускает корутину в фоне без ожидания результата."""
asyncio.create_task(coroutine)
async def main():
# Пример: отправка лога
await send_and_forget(log_event("user_request", details))
Callback-обработчик (webhook)
from aiohttp import web
async def handle_callback(request):
data = await request.json()
task_id = data.get("task_id")
result = data.get("result")
# сохранить результат для последующего использования
return web.Response(status=200)
8. Обработка ошибок и таймауты
- Retry для request-response:
- Экспоненциальная задержка + jitter.
- Не для идемпотентных операций (GET) — безопасно.
- Для неидемпотентных — использовать idempotency keys.
- Circuit breaker:
- Отключать вызовы к агенту, если он отвечает ошибками, чтобы не забивать очередь.
- Мониторинг:
- latency перцентили (p50, p95, p99).
- количество таймаутов, ошибок 5xx.
- глубина очередей (для fire-and-forget).
Пример реализации с retry
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
async def reliable_request(url, payload):
async with aiohttp.ClientSession() as session:
return await fetch_agent_response(session, url, payload, timeout=5)
9. Интеграция в Agentic RAG
Типичная архитектура с несколькими агентами
- Agent Orchestrator — получает запрос пользователя.
- Retrieval Agent — ищет документы.
- Generator Agent — формирует ответ.
- Tool Agent — вызывает API.
Коммуникационные паттерны:
- Оркестратор использует request-response для вызова Retrieval Agent (быстро).
- Если Retrieval Agent должен индексировать большой документ, он инициирует fire-and-forget для Indexer Agent.
- Generator Agent может потоково отдавать токены через SSE пользователю.
- Для длительных операций (суммаризация) используется async-response с callback.
Пример конфигурации (JSON)
{
"agents": [
{
"name": "retriever",
"protocol": "request-response",
"timeout": 10,
"retry": 2
},
{
"name": "indexer",
"protocol": "fire-and-forget",
"queue": "indexing-tasks"
},
{
"name": "streamer",
"protocol": "streaming",
"type": "websocket"
}
]
}
10. Метрики и мониторинг
| Метрика | Описание | Для какого стиля |
|---|---|---|
| Latency (p50/p95) | Время выполнения | Request-response, streaming |
| Throughput (запросов/сек) | Количество обработанных сообщений | Все |
| Error rate | Доля неуспешных вызовов | Request-response |
| Queue depth | Количество сообщений в очереди | Fire-and-forget, async-response |
| Callback timeout rate | Доля ответов, не полученных вовремя | Async-response |
| Idle timeout | Количество разрывов streaming-соединений | Streaming |
Инструменты: Prometheus + Grafana, OpenTelemetry, дашборды для очередей (Kafka UI, RabbitMQ Management).
Пет-проект для закрепления
Задача: Создать мини-систему из двух агентов (Planner и Worker), которые общаются через три паттерна: request-response, fire-and-forget, async-response.
Инструменты:
- Python 3.11+, asyncio, aiohttp.
- Redis (как брокер pub/sub для fire-and-forget).
- (Опционально) Celery для async-response.
Шаги:
- Реализовать Planner Agent — принимает HTTP запросы, распределяет задачи.
- Реализовать Worker Agent — эмулирует выполнение задачи (с задержкой).
- Реализовать endpoint
/sync-task(request-response) — Planner вызывает Worker синхронно. - Реализовать
redis pub/subдля fire-and-forget — Planner публикует, Worker подписывается. - Реализовать async-response через callback: Worker отправляет результат на URL.
- Добавить таймауты и ретраи (используйте
tenacity). - Измерить latency для каждого стиля с помощью
time.perf_counter.
Ожидаемый результат: Готовый код, который демонстрирует выбор стиля в зависимости от сценария, и выводы о производительности.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 815 | Как проектировать архитектуру агентов в RAG? |
| 816 | Как управлять состоянием агентов? |
| 817 | Как агент планирует последовательность действий? |
| 819 | Как строить пайплайны из агентов? |
| 820 | Как обеспечить наблюдаемость агентной системы? |
| 812 | Какие метрики мониторинга latency важны в RAG? |
Навигация
- Предыдущий: 817
- Следующий: 819
- Индекс: 00. Индекс разборов