English translation is not available yet. Showing Russian content.

Что такое «message bus» для агентов (Kafka, NATS, Redis PubSub)?

Краткий тезис

Message bus (шина сообщений) — это централизованная инфраструктура обмена данными между AI-агентами, которая заменяет прямые вызовы. В контексте RAG|Agentic RAG он позволяет агентам отправлять запросы, получать результаты, координировать действия асинхронно и масштабироваться. Основные реализации: Apache Kafka (надёжность, воспроизведение, высокая пропускная способность), NATS (минимальная задержка, лёгкость) и Redis PubSub (простота, in-memory). Выбор зависит от требований к долговечности, скорости и сложности системы.

1. Термин: Message Bus для агентов

Message bus — это шаблон интеграции, при котором все компоненты (агенты) общаются через общий канал, а не напрямую. Агенты публикуют сообщения в определенные топики (topics), а другие агенты подписываются на них. Такой подход даёт:

  • Слабое связывание — агенты не знают друг о друге, меняются независимо.
  • Масштабируемость — можно запускать несколько экземпляров одного агента, балансируя нагрузку через группы потребителей (consumer groups).
  • Асинхронность — отправитель не блокируется, может продолжать работу.
  • Отказоустойчивость — если один агент упал, сообщения сохраняются в очереди до его восстановления.

В контексте AI-агентов message bus используется для:

  • Координации цепочек вызовов (planningretrievalgeneration → verification).
  • Обмена промежуточными результатами (например, несколько агентов-исследователей одновременно ищут информацию).
  • Логирования и мониторинга действий агентов (каждое сообщение можно записать для ретроспективы).

2. Основные реализации Message Bus

2.1 Apache Kafka

Kafka — распределённая платформа потоковой передачи событий, спроектированная для высокой пропускной способности и долговременного хранения.

ХарактеристикаОписание
ХранениеСообщения хранятся на диске, можно повторно читать (реплей)
ГарантииAt-least-once, exactly-once (с настройками)
ПроизводительностьДесятки тысяч сообщений в секунду
ЗадержкаНесколько миллисекунд (выше, чем NATS, но всё ещё мала)
СценарийКогда нужен аудит, ретроспективный анализ, сложная маршрутизация

Пример использования в RAG|Agentic RAG
Все запросы пользователей, результаты retrieval и сгенерированные ответы пишутся в Kafka. Затем отдельный агент-анализатор может перечитывать поток, чтобы улучшать качество или выявлять аномалии.

Ключевые термины

  • Topic (топик) — именованный канал, куда пишутся сообщения одного типа.
  • Partition (партиция) — шард топика для параллелизма.
  • Consumer group (группа потребителей) — набор потребителей, распределяющих нагрузку по партициям.

Простая схема работы

Producer (агент A) → [Kafka Topic "answers"] → Consumer (агент B)

Группа из 3 экземпляров агента B будет читать из трёх партиций параллельно.

2.2 NATS

NATS — высокопроизводительная система обмена сообщениями с упором на низкую задержку и простоту.

ХарактеристикаОписание
ХранениеПо умолчанию in-memory (можно включить JetStream для персистентности)
ГарантииAt-most-once (базовый), с JetStream — at-least-once
ЗадержкаМикросекунды (один из самых быстрых брокеров)
ВесЛёгкий демон (несколько MB), просто развернуть
СценарийРеальное время, микросервисы, IoT

Пример:
Агент-планировщик отправляет задание на retrieval через NATS. Агент-retriever получает его, обрабатывает и через NATS же возвращает результаты. Вся коммуникация происходит за миллисекунды.

Ключевая особенность Поддержка wildcard-ов в топиках (*.events.agent), что упрощает маршрутизацию.

2.3 Redis PubSub

Redis PubSub — простейший механизм публикации/подписки, встроенный в Redis.

ХарактеристикаОписание
ХранениеНет — если подписчик неактивен, сообщение теряется
ГарантииAt-most-once (не гарантирует доставку)
ПроизводительностьВысокая для небольших объёмов
СложностьМинимальная, не нужен отдельный демон брокера (Redis уже есть)
СценарийПрототипы, маленькие системы, broadcast-уведомления

Ограничения

  • Сообщения не сохраняются — если агент отключился, он ничего не получит.
  • Нет групп потребителей — все подписчики получают все сообщения (конкуренция отсутствует).

Пример:
Агент-монитор подписан на канал logs:errors, получает уведомления об ошибках от всех других агентов в реальном времени.

3. Сравнительная таблица

КритерийKafkaNATSRedis PubSub
ПерсистентностьДа (диск)Опционально (JetStream)Нет
Воспроизведение (replay)Да, произвольноеОграниченное (JetStream)Нет
ЗадержкаНесколько мсМикросекундыМикросекунды (но меньше гарантий)
Пропускная способностьОчень высокаяВысокаяСредняя (зависит от Redis)
Гарантии доставкиAt-least-once / Exactly-onceAt-most-once / at-least-onceAt-most-once
Группы потребителейДаДа (через JetStream)Нет
Сложность администрированияВысокая (нужен ZooKeeper/KRaft)НизкаяОчень низкая
Типичные сценарии в Agentic RAGАудит, логирование, ретроспективаРеальное время, микросервисная оркестрацияПрототипы, лёгкие уведомления

4. Концепции, общие для всех брокеров

  • Topic (топик) — именованный канал. Например, agent.retrieval.request, agent.generation.response.
  • Producer (производитель) — агент, отправляющий сообщение.
  • Consumer (потребитель) — агент, получающий сообщение.
  • Consumer Group — пул потребителей, совместно обрабатывающих сообщения из топика. В Kafka сообщение доставляется одному потребителю из группы, что позволяет распределять нагрузку.
  • Offset (смещение) — позиция сообщения в партиции, используется для отслеживания прогресса чтения.
  • At-least-once / at-most-once / exactly-once — семантика доставки (количество раз, которое сообщение может быть обработано).

5. Как выбрать брокер для агентной системы?

Критические вопросы

  • Нужно ли хранить историю сообщений для последующего анализа? → да ⇒ Kafka.
  • Критична ли задержка? → да ⇒ NATS.
  • Вы делаете MVP? → Redis PubSub или NATS (проще).
  • Планируется сложная маршрутизация по темам? → NATS с wildcards.
  • Требуется точно-однократная обработка? → Kafka с идемпотентностью.

Архитектурный паттерн Часто используют гибрид — Kafka для долгосрочного хранения и ретроспективы, NATS или Redis для краткосрочной синхронной коммуникации.

6. Интеграция с Python

Пример отправки сообщения через Kafka с помощью confluent-kafka:

from confluent_kafka import Producer

conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

producer.produce('agent.requests', value=b'{"agent":"retriever", "query":"...", "id":"123"}', callback=delivery_report)
producer.flush()

Пример подписки через NATS (nats-py):

import asyncio
from nats.aio.client import Client as NATS

async def handler(msg):
    subject = msg.subject
    data = msg.data.decode()
    print(f"Received on {subject}: {data}")

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    await nc.subscribe("agent.retrieval.response", cb=handler)
    await asyncio.Event().wait()

asyncio.run(main())

Redis PubSub с redis-py:

import redis

r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('agent:notifications')

for message in pubsub.listen():
    print(message)

7. Проблемы и best practices

  • Дублирование сообщений Используйте идемпотентные обработчики.
  • Out-of-order В Kafka порядок гарантируется внутри партиции — группируйте логически связанные сообщения в одну партицию по ключу.
  • Backpressure: Если агенты медленнее, чем продюсеры, используйте ограничения в consumer groups и настройки max.poll.interval.ms.
  • Мониторинг: Обязательно логируйте offset, задержки, количество сообщений в очереди (lag).

8. Message Bus в контексте Agentic RAG

Архитектура с message bus позволяет реализовать:

  • Параллельные агенты несколько агентов-исследователей одновременно ищут данные, результаты собираются в одном топике.
  • Цепочки вызовов с обратной связью агент A → топик → агент B → другой топик → агент C.
  • Human-in-the-loop агент приостанавливает выполнение, публикует запрос в топик human.approval, человек читает и отвечает.
  • Масштабирование при росте нагрузки достаточно добавить копии агентов (consumers) в группу.

Без message bus агенты общаются напрямую (HTTP/gRPC), что создаёт жёсткое связывание, усложняет повторную обработку и отладку.

9. Пет-проект для закрепления

Задача Разработать простую систему из трёх AI-агентов (Planner, Retriever, Generator), связанных через NATS или Kafka.

Инструменты

  • Python + nats-py (или confluent-kafka для Kafka).
  • LLM (любой OpenAI‑совместимый API, можно эмулировать).
  • Векторная БД (FAISS или Chroma на локальном документе).
  • Docker (поднять NATS/Kafka одной командой).

Шаги:

  1. Разверните NATS (docker run -p 4222:4222 nats).
  2. Реализуйте агента Planner, который получает запрос пользователя, публикует в топик planner.plan запрос на поиск.
  3. Агент Retriever подписан на planner.plan, выполняет поиск в векторной БД, публикует результаты в retriever.results.
  4. Агент Generator подписан на retriever.results, генерирует ответ через LLM, публикует в generator.answer.
  5. Главный скрипт подписывается на generator.answer, выводит итоговый ответ.

Ожидаемый результат

  • Полностью асинхронная система, где каждый агент можно перезапустить независимо.
  • Понимание принципов топиков, групп потребителей (можно запустить 2 экземпляра Retriever и убедиться, что они делят нагрузку).
  • Замеры задержек (NATS даст <1 мс, Kafka немного больше).

10. Связь с другими вопросами

ВопросТема
801Основы архитектуры Agentic RAG (как агенты взаимодействуют)
802Когнитивные архитектуры для агентов (планирование, цепи)
803Инструменты (tools) и их публикация через bus
805Оркестрация против хореографии (bus реализует хореографию)
812Агентские протоколы (A2A, MCP) и их связь с bus
820Мониторинг и логирование в Agentic RAG (как использовать Kafka для аудита)

11. Навигация


Навигация