English translation is not available yet. Showing Russian content.

Развернуть message bus (NATS/Kafka)

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Развернуть message bus (NATS/Kafka)

1. Цель задачи

Научиться заменять прямые HTTP/gRPC вызовы между агентами на асинхронное общение через message broker. В результате вы развернёте NATS (или Kafka) в Docker Compose, напишете producer и consumer для типового сообщения (например, запроса на обработку документа) и убедитесь, что сообщение гарантированно доставляется одному из потребителей.

Ключевой результат Работающий стенд из двух агентов (producer/consumer), обменивающихся сообщениями через брокер с подтверждением доставки (ACK). Сообщение не теряется при временном отключении consumer.


2. Исходные данные

Что нужноОткуда взять
Docker Engine 24+ (или Docker Desktop)docker.com
docker-compose (или Compose v2)встроен в Docker Desktop, или установить отдельно
NATS-сервер (образ nats:2.10)Docker Hub
Kafka (опционально: bitnami/kafka:3.6)Docker Hub
Python 3.10+python.org
Библиотека nats-py (или kafka-python)pip install nats-py / pip install kafka-python
Исходный код двух агентов (заготовки)Приложить к заданию (см. ниже)

Если нет реального инструмента — симулируем:
На момент выполнения задачи нет рабочего кластера с агентами. Поэтому создаём минимальный симулятор:

  1. Пишем два скрипта: agent_producer.py (отправляет случайные события каждые 2 секунды) и agent_consumer.py (принимает и логирует).
  2. Разворачиваем NATS (или Kafka) локально в Docker.
  3. Подключаем скрипты к брокеру и проверяем доставку, отключая consumer на 10 секунд и наблюдая, что сообщения накапливаются и доставляются после переподключения.

3. Технологический стек

КомпонентИнструментыНазначение
Message brokerNATS 2.10 / Kafka 3.6Промежуточная шина для сообщений между агентами
КонтейнеризацияDocker ComposeЛокальный запуск брокера, продюсера, консюмера
Клиентская библиотека (Python)nats-py / kafka-pythonВзаимодействие агентов с брокером
ObservabilityNATS CLI (nats) или kcatПроверка очередей, подписок, статистики
Локальный тестtimeout, pkill, docker logsЭмуляция сбоев и перезапусков

4. Этапы выполнения

Этап 1: Развёртывание message broker через Docker Compose (20 минут)

Действия

  1. Создайте директорию message-bus-lab/.
  2. Внутри создайте docker-compose.yml для NATS (альтернативный вариант — Kafka — выберите один, но опишите оба).
# docker-compose.yml (NATS)
version: '3.8'
services:
  nats:
    image: nats:2.10-alpine
    ports:
      - "4222:4222"
      - "8222:8222"
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222"

  producer:
    build: ./agents
    depends_on:
      - nats
    environment:
      - NATS_URL=nats://nats:4222
      - AGENT_ROLE=producer
    restart: "no"
    # запуск через docker compose run --rm

  consumer:
    build: ./agents
    depends_on:
      - nats
    environment:
      - NATS_URL=nats://nats:4222
      - AGENT_ROLE=consumer
    restart: "no"
  1. Создайте agents/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]
  1. Создайте agents/requirements.txt:
nats-py>=2.8.0
  1. Поднимите стек: docker compose up -d nats. Проверьте, что порт 8222 (HTTP мониторинг) отвечает: curl http://localhost:8222 — должно вернуть JSON со статусом.

Ожидаемый результат этапа Работающий NATS в Docker Compose, доступный на localhost:4222, отдающий статус на порту 8222.


Этап 2: Написание агента-производителя (producer) (30 минут)

Действия

  1. Внутри agents/ создайте main.py с точкой входа.
  2. Реализуйте логику: producer подключается к NATS, каждые 2 секунды публикует сообщение JSON вида:
{
  "agent_id": "producer-1",
  "event": "processing_request",
  "payload": {
    "document_id": "doc_xxxx",
    "timestamp": "2025-04-08T10:00:00Z"
  }
}
  1. Используйте JetStream (поток) для гарантии доставки, а не просто core NATS.
import asyncio
import json
import os
import uuid
from datetime import datetime, timezone

import nats
from nats.js import JetStreamContext

NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222")
STREAM_NAME = "agent_events"
SUBJECT = "agent.request"

async def run():
    nc = await nats.connect(NATS_URL)
    js: JetStreamContext = nc.jetstream()
    # Создаём стрим (если нет)
    try:
        await js.add_stream(name=STREAM_NAME, subjects=[SUBJECT])
    except nats.js.errors.InvalidStreamNameError:
        pass  # уже существует

    while True:
        msg = {
            "agent_id": "producer-1",
            "event": "processing_request",
            "payload": {
                "document_id": f"doc_{uuid.uuid4().hex[:8]}",
                "timestamp": datetime.now(timezone.utc).isoformat()
            }
        }
        ack = await js.publish(SUBJECT, json.dumps(msg).encode())
        print(f"Published, seq={ack.seq}")
        await asyncio.sleep(2)

if __name__ == "__main__":
    asyncio.run(run())
  1. Запустите продюсера: docker compose run --rm producer.

Ожидаемый результат этапа Продюсер каждые 2 секунды публикует сообщения в стрим agent_events, видно в логах последовательные seq.


Этап 3: Написание агента-потребителя (consumer) (30 минут)

Действия

  1. Добавьте в main.py вторую ветвь по переменной AGENT_ROLE=consumer.
  2. Consumer подписывается на durable pull-подписку (чтобы сообщения не терялись при отключении).
  3. При получении сообщения выводит его в лог и acknowledges (подтверждает обработку).
elif role == "consumer":
    # durable consumer
    sub = await js.pull_subscribe(SUBJECT, durable="agent-consumer-1")
    while True:
        try:
            msgs = await sub.fetch(batch=1, timeout=10)
            for msg in msgs:
                data = json.loads(msg.data.decode())
                print(f"Processed: {data['payload']['document_id']}")
                await msg.ack()
        except nats.errors.TimeoutError:
            pass
  1. Запустите два экземпляра consumer в разных окнах/контейнерах (с одинаковым durable name — тогда NATS будет балансировать нагрузку, но если один упадёт, второй заберёт его сообщения).
  2. Убедитесь, что каждое опубликованное сообщение обрабатывается ровно одним consumer (круговая балансировка).

Ожидаемый результат этапа Consumer получает и подтверждает все сообщения. В логах видна обработка без дублирования.


Этап 4: Проверка гарантии доставки (20 минут)

Действия

  1. Тест 1: Временное отсутствие потребителя

    • Запустите только producer (consumer не запущен). Через 10 секунд запустите consumer.
    • Наблюдайте, что consumer получает все пропущенные сообщения (накопленные в стриме).
  2. Тест 2: Падение consumer

    • Запустите consumer, через 5 секунд принудительно завершите его (Ctrl+C). Через 10 секунд перезапустите.
    • Убедитесь, что сообщения, опубликованные во время "даунтайма", не потеряны.
  3. Тест 3: Конкуренция consumer'ов

    • Запустите два consumer с разными durable именами (или одинаковыми — для балансировки).
    • Проверьте, что каждое сообщение обрабатывается ровно один раз (не дублируется и не теряется).

Для проверки используйте мониторинг NATS

# Зайдите в контейнер nats и выполните
docker compose exec nats nats stream list
docker compose exec nats nats stream info agent_events

Видно количество сообщений, подтверждённых и ожидающих.

Ожидаемый результат этапа Все сообщения доставлены, подтверждены, метрики стрима показывают отсутствие потерь.


Этап 5: Подготовка отчёта и рефакторинг (10 минут)

Действия

  1. Создайте простой скрипт test_delivery.py, который:

    • Запускает стрим/консьюмер через subprocess
    • Публикует 100 сообщений
    • Ждёт, пока все будут подтверждены
    • Сравнивает количество опубликованных и обработанных, падает если не равно.
  2. Документируйте архитектуру в README.md:

Ожидаемый результат этапа Законченный проект в message-bus-lab/, готовый к повторному развёртыванию.


5. Критерии приемки (Definition of Done)

  • Docker Compose запускает NATS (или Kafka) и два сервиса-агента.
  • Producer публикует сообщения с уникальным ID (document_id).
  • Consumer получает и acknowledges все сообщения.
  • При временном отключении consumer сообщения не теряются (накапливаются в стриме).
  • При запуске второго consumer сообщения балансируются между ними (round-robin).
  • Метрики (количество сообщений в стриме, ack rate) можно посмотреть через NATS CLI.
  • Реализован автоматический тест, который проверяет потерю < 0.1% (допускается одно дублирование при рестарте).
  • Код поддерживает переключение на Kafka (через env BROKER_TYPE=kafka — опционально, достаточно NATS по умолчанию).

6. Ожидаемый результат

Основной артефакт — директория message-bus-lab/ со следующим содержимым:

message-bus-lab/
├── docker-compose.yml     # Описание инфраструктуры
├── agents/
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── main.py            # Producer + Consumer (переключение по роли)
│   └── test_delivery.py   # Автоматический тест доставки
└── README.md              # Инструкция и схема

Содержание README

  • Краткое описание задачи
  • Как запустить (docker compose up -d nats, затем docker compose run --rm consumer и docker compose run --rm producer)
  • Результаты тестов
  • Вывод: какие гарантии даёт NATS JetStream и как их проверить.

Дополнительные результаты (опционально):

  • Лог-файл с подтверждениями последовательностей seq.
  • Grafana-дашборд для мониторинга (если развернуты метрики NATS через Prometheus).

7. Возможные сложности и их решение

СложностьРешение
NATS не стартует, порт занятСменить порт в docker-compose (например, 4223:4222)
Consumer не получает старые сообщенияПроверить durable имя: если указан durable="agent-consumer-1" и consumer не запущен, NATS сохраняет сообщения для этого durable-подписчика. При повторном запуске он получит backlog.
Сообщения дублируются при перезапуске consumerВключить дедупликацию на стороне producer (поле Nats-Msg-Id). JetStream гарантирует Exactly-Once для одного producer.
Kafka — сложнее конфигурация, топики, партицииИспользуйте NATS как основной вариант. Для Kafka потребуется дополнительный файл docker-compose-kafka.yml и настройка KAFKA_BOOTSTRAP_SERVERS.
Python-клиент зависает при долгом отсутствии NATSДобавить таймауты и повторные попытки: await nats.connect(NATS_URL, connect_timeout=5, reconnect_time_wait=2)

8. Бюджет времени (оценка)

ЭтапВремя (минут)
1. Развёртывание message broker20
2. Написание producer30
3. Написание consumer30
4. Проверка гарантии доставки20
5. Подготовка отчёта и тест10
Итого110 минут (≈ 2 часа)

Примечание для первого раза Если вы впервые работаете с NATS, добавьте 20-30 минут на изучение документации и отладку.


9. Связанные вопросы из базы знаний

ВопросТема
17Как реализовать обмен сообщениями между агентами? (основы)
32JetStream vs Core NATS — когда что использовать?
45Kafka consumer groups и rebalance
108Обеспечение exactly-once delivery в message bus
134Мониторинг брокера сообщений (NATS CLI, Prometheus)
171Паттерн Request-Reply через брокер
218Dead letter queue и обработка ошибок
289Сериализация сообщений (JSON, Protobuf, Avro)
345Сравнение NATS, RabbitMQ, Kafka для микросервисов
401Rate limiting и backpressure в асинхронных очередях

10. Чек-лист самопроверки

  • Я развернул Docker Compose с NATS (или Kafka) и запустил консюмера/продюсера.
  • Я проверил, что при отключении консюмера сообщения накапливаются и приходят после переподключения.
  • Я написал автоматический тест, который публикует 100 сообщений и ждёт подтверждения всех от консюмера.
  • Я посмотрел nats stream info agent_events и убедился, что messages = acknowledged.
  • Я протестировал ситуацию с двумя консюмерами — каждое сообщение обработано ровно один раз.
  • Я задокументировал архитектуру и команды запуска в README.