Как проектировать request-response vs fire-and-forget для агентов?

теги: [разбор, агенты, архитектура] вопрос: 818

Краткий тезис

Выбор стиля коммуникации между агентами или между клиентом и агентом в Agentic RAG определяется балансом между синхронностью, надёжностью и задержкой. Request-response (синхронный запрос-ответ) применяется, когда нужен немедленный результат. Fire-and-forget (отправил и забыл) — для асинхронных уведомлений, логирования или запуска длительных задач. Промежуточные варианты — async responsecallback/webhook) и streaming (непрерывный поток) — позволяют сочетать достоинства обоих подходов. Проектирование включает выбор протокола (HTTP, gRPC, очереди сообщений), обработку таймаутов, повторные попытки и мониторинг очередей.


1. Термины и контекст

Agentic RAG — архитектура, где агенты (автономные модули с LLM) координируют выполнение подзадач: поиск документов, генерация ответов, вызов внешних инструментов. Коммуникация между агентами и с клиентом осуществляется через сообщения.

Стили коммуникации

  • Request-response (синхронный) — отправитель посылает запрос и блокируется до получения ответа.
  • Fire-and-forget (асинхронный без ответа) — отправитель отправляет сообщение и продолжает работу, не ожидая ответа.
  • Async-response (асинхронный с ответом) — отправитель отправляет запрос и получает ответ позже через callback, webhook или опрос.
  • Streaming — непрерывная двунаправленная передача данных (Server-Sent Events, WebSocket).

Термин «агент» — программный модуль, который на основе LLM принимает решения, вызывает инструменты и поддерживает контекст диалога.


2. Request-response (синхронный паттерн)

Как работает

Клиент (или другой агент) отправляет запрос и ждёт ответа. Обычно реализуется через HTTP REST, gRPC или внутрипроцессный вызов.

Пример (HTTP)

import httpx

response = httpx.post(
    "http://agent-api/rag-query",
    json={"query": "как оценить RAG?"},
    timeout=10.0
)
result = response.json()
print(result)

Когда использовать

  • Нужен немедленный результат (например, ответ пользователю в чате).
  • Операция выполняется быстро (до нескольких секунд).
  • Надёжность обеспечивается повторными попытками (retry) и таймаутами.

Проблемы

  • Блокировка — отправитель занят ожиданием.
  • Каскадные таймауты — если цепочка вызовов длинная, общая задержка суммируется.
  • Сложность с отменой — если ответ не нужен, его трудно отозвать.

Проектирование request-response в агентной системе

  • Устанавливаем таймаут (обычно 5-30 секунд).
  • Добавляем retry policy (экспоненциальная задержка, jitter).
  • Используем circuit breaker для защиты от перегрузки.
  • В HTTP — статус-коды: 200 (успех), 4xx (ошибка клиента), 5xx (ошибка сервера).

Сравнение протоколов

ПротоколПреимуществаНедостатки
HTTP RESTПростота, широкая поддержкаБольшой overhead, отсутствие потоковой передачи
gRPCВысокая скорость, streaming, codegenСложнее в настройке
RabbitMQ / Kafka (RPC)Асинхронность, надёжностьНужен брокер, latency выше

3. Fire-and-forget (отправил и забыл)

Как работает

Отправитель публикует сообщение в очередь или топик и немедленно продолжает работу. Получатель обрабатывает сообщение асинхронно. Ответ не ожидается, но может быть отправлен отдельно (тогда это async-response).

Пример (через очередь)

import aioredis

redis = await aioredis.from_url("redis://localhost")
await redis.publish("agent-events", "task:index_document")
# Не ждём подтверждения обработки

Когда использовать

  • Уведомления (логи, метрики, обновление кэша).
  • Запуск длительных фоновых задач (индексация, суммаризация).
  • Сценарии, где потеря сообщения допустима (если нет гарантии доставки).

Проблемы

  • Нет обратной связи — отправитель не знает, выполнилась ли задача успешно.
  • Отсутствие гарантий — если брокер падает, сообщение может потеряться.
  • Сложность отладки — сложно трассировать цепочку вызовов.

Проектирование fire-and-forget

  • Используем персистентные очереди (Kafka, RabbitMQ) с подтверждением записи — сообщение не потеряется, даже если потребитель временно недоступен.
  • Для критичных сообщений добавляем DLQ (Dead Letter Queue) для ошибочных.
  • Включаем мониторинг глубины очереди и скорости потребления.

Пример архитектуры

[Агент-отправитель] → (запись в очередь) → [Брокер (Kafka)] → [Агент-обработчик]

4. Async-response (fire-and-forget + callback)

Как работает

Отправитель отправляет запрос (как fire-and-forget), но ожидает ответ асинхронно через механизм обратного вызова:

  • Callback URL — обработчик после завершения отправляет результат на указанный endpoint.
  • Webhook — аналогично, но инициируется системой.
  • Polling — отправитель периодически опрашивает статус задачи.
  • Message queue reply — ответ публикуется в очередь, уникальный идентификатор связывает запрос и ответ.

Пример (callback)

# запрос с callback_url
requests.post(
    "http://long-running-agent/process",
    json={"task": "summarize_docs", "callback_url": "http://caller-webhook/reply"}
)

Когда использовать

  • Длинные операции (больше нескольких секунд).
  • Нельзя ждать ответа синхронно (например, агент должен обрабатывать множество запросов).
  • Требуется гарантированная доставка ответа.

Плюсы/минусы

АспектПлюсыМинусы
Загрузка отправителяНе блокируетсяНужно обслуживать callback endpoint
НадёжностьМожно реализовать exactly-onceСложнее в отладке
LatencyМинимальная для отправителяОбщая задержка может быть больше

5. Streaming (непрерывный поток)

Как работает

Данные передаются непрерывно в одном или обоих направлениях. В агентных системах часто используется для:

  • Server-Sent Events (SSE) — сервер отправляет события клиенту.
  • WebSocket — двунаправленный канал.

Пример SSE

GET /agent-stream?query=что такое RAG HTTP/1.1
Accept: text/event-stream

response:
data: {"type": "chunk", "text": "RAG это..."}
data: {"type": "final", "text": "RAG это Retrieval-Augmented Generation."}

Когда использовать

  • Агент генерирует ответ по частям (streaming токенов от LLM).
  • Промежуточные результаты (поиск, вызов инструментов) визуализируются.
  • Двунаправленная коммуникация (WebSocket позволяет клиенту прервать генерацию).

Проектирование

  • Используем async generators в Python для чтения ответа.
  • Добавляем идентификаторы сообщений (message_id) для восстановления порядка.
  • Устанавливаем таймаут на бездействие (idle timeout).

6. Как выбирать стиль коммуникации

Факторы выбора

Таблица принятия решений

СценарийРекомендуемый стильКомментарий
Пользовательский чат-бот (быстрый ответ)Request-response (или streaming)Нужен мгновенный ответ
Индексация нового документа (долгая)Fire-and-forget (с логированием)Результат не нужен сразу
Запрос к внешнему API (несколько секунд)Async-response (callback)Не блокировать агента
Агент-планировщик делегирует задачуAsync-response (через очередь)Для координации
Пошаговое объяснение от LLMStreaming (SSE или WebSocket)Показывать токены по мере генерации

Реализация компромиссов

Пример: гибридный подход

from asyncio import create_task, get_event_loop

async def handle_request(query):
    # 1. Быстрый поиск (request-response)
    docs = await search_docs(query)  # блокирует, но быстро
    # 2. Запускаем долгую суммаризацию (fire-and-forget с результатом в очередь)
    task_id = await dispatch_summarization(docs)  # немедленно возвращает task_id
    # 3. Потоковая отправка промежуточного ответа
    for token in generate_intermediate(query, docs):
        yield token
    # 4. Позже получаем callback с полным ответом
    final = await wait_for_result(task_id, timeout=60)
    yield final

7. Реализация на Python с asyncio

Заготовка для request-response

import aiohttp

async def fetch_agent_response(session, url, payload, timeout=10):
    async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
        if resp.status == 200:
            return await resp.json()
        raise Exception(f"Agent error: {resp.status}")

Заготовка для fire-and-forget через asyncio.create_task

import asyncio

async def send_and_forget(coroutine):
    """Запускает корутину в фоне без ожидания результата."""
    asyncio.create_task(coroutine)

async def main():
    # Пример: отправка лога
    await send_and_forget(log_event("user_request", details))

Callback-обработчик (webhook)

from aiohttp import web

async def handle_callback(request):
    data = await request.json()
    task_id = data.get("task_id")
    result = data.get("result")
    # сохранить результат для последующего использования
    return web.Response(status=200)

8. Обработка ошибок и таймауты

  • Retry для request-response:
    • Экспоненциальная задержка + jitter.
    • Не для идемпотентных операций (GET) — безопасно.
    • Для неидемпотентных — использовать idempotency keys.
  • Circuit breaker:
    • Отключать вызовы к агенту, если он отвечает ошибками, чтобы не забивать очередь.
  • Мониторинг:
    • latency перцентили (p50, p95, p99).
    • количество таймаутов, ошибок 5xx.
    • глубина очередей (для fire-and-forget).

Пример реализации с retry

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
async def reliable_request(url, payload):
    async with aiohttp.ClientSession() as session:
        return await fetch_agent_response(session, url, payload, timeout=5)

9. Интеграция в Agentic RAG

Типичная архитектура с несколькими агентами

Коммуникационные паттерны:

  • Оркестратор использует request-response для вызова Retrieval Agent (быстро).
  • Если Retrieval Agent должен индексировать большой документ, он инициирует fire-and-forget для Indexer Agent.
  • Generator Agent может потоково отдавать токены через SSE пользователю.
  • Для длительных операций (суммаризация) используется async-response с callback.

Пример конфигурации (JSON)

{
  "agents": [
    {
      "name": "retriever",
      "protocol": "request-response",
      "timeout": 10,
      "retry": 2
    },
    {
      "name": "indexer",
      "protocol": "fire-and-forget",
      "queue": "indexing-tasks"
    },
    {
      "name": "streamer",
      "protocol": "streaming",
      "type": "websocket"
    }
  ]
}

10. Метрики и мониторинг

МетрикаОписаниеДля какого стиля
Latency (p50/p95)Время выполненияRequest-response, streaming
Throughput (запросов/сек)Количество обработанных сообщенийВсе
Error rateДоля неуспешных вызововRequest-response
Queue depthКоличество сообщений в очередиFire-and-forget, async-response
Callback timeout rateДоля ответов, не полученных вовремяAsync-response
Idle timeoutКоличество разрывов streaming-соединенийStreaming

Инструменты: Prometheus + Grafana, OpenTelemetry, дашборды для очередей (Kafka UI, RabbitMQ Management).


Пет-проект для закрепления

Задача: Создать мини-систему из двух агентов (Planner и Worker), которые общаются через три паттерна: request-response, fire-and-forget, async-response.

Инструменты:

Шаги:

  1. Реализовать Planner Agent — принимает HTTP запросы, распределяет задачи.
  2. Реализовать Worker Agent — эмулирует выполнение задачи (с задержкой).
  3. Реализовать endpoint /sync-task (request-response) — Planner вызывает Worker синхронно.
  4. Реализовать redis pub/sub для fire-and-forgetPlanner публикует, Worker подписывается.
  5. Реализовать async-response через callback: Worker отправляет результат на URL.
  6. Добавить таймауты и ретраи (используйте tenacity).
  7. Измерить latency для каждого стиля с помощью time.perf_counter.

Ожидаемый результат: Готовый код, который демонстрирует выбор стиля в зависимости от сценария, и выводы о производительности.


Связь с другими вопросами

ВопросТема
815Как проектировать архитектуру агентов в RAG?
816Как управлять состоянием агентов?
817Как агент планирует последовательность действий?
819Как строить пайплайны из агентов?
820Как обеспечить наблюдаемость агентной системы?
812Какие метрики мониторинга latency важны в RAG?


Навигация