English translation is not available yet. Showing Russian content.
Развернуть message bus (NATS/Kafka)
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Развернуть message bus (NATS/Kafka)
1. Цель задачи
Научиться заменять прямые HTTP/gRPC вызовы между агентами на асинхронное общение через message broker. В результате вы развернёте NATS (или Kafka) в Docker Compose, напишете producer и consumer для типового сообщения (например, запроса на обработку документа) и убедитесь, что сообщение гарантированно доставляется одному из потребителей.
Ключевой результат Работающий стенд из двух агентов (producer/consumer), обменивающихся сообщениями через брокер с подтверждением доставки (ACK). Сообщение не теряется при временном отключении consumer.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Docker Engine 24+ (или Docker Desktop) | docker.com |
| docker-compose (или Compose v2) | встроен в Docker Desktop, или установить отдельно |
| NATS-сервер (образ nats:2.10) | Docker Hub |
| Kafka (опционально: bitnami/kafka:3.6) | Docker Hub |
| Python 3.10+ | python.org |
| Библиотека nats-py (или kafka-python) | pip install nats-py / pip install kafka-python |
| Исходный код двух агентов (заготовки) | Приложить к заданию (см. ниже) |
Если нет реального инструмента — симулируем:
На момент выполнения задачи нет рабочего кластера с агентами. Поэтому создаём минимальный симулятор:
- Пишем два скрипта:
agent_producer.py(отправляет случайные события каждые 2 секунды) иagent_consumer.py(принимает и логирует). - Разворачиваем NATS (или Kafka) локально в Docker.
- Подключаем скрипты к брокеру и проверяем доставку, отключая consumer на 10 секунд и наблюдая, что сообщения накапливаются и доставляются после переподключения.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Message broker | NATS 2.10 / Kafka 3.6 | Промежуточная шина для сообщений между агентами |
| Контейнеризация | Docker Compose | Локальный запуск брокера, продюсера, консюмера |
| Клиентская библиотека (Python) | nats-py / kafka-python | Взаимодействие агентов с брокером |
| Observability | NATS CLI (nats) или kcat | Проверка очередей, подписок, статистики |
| Локальный тест | timeout, pkill, docker logs | Эмуляция сбоев и перезапусков |
4. Этапы выполнения
Этап 1: Развёртывание message broker через Docker Compose (20 минут)
Действия
- Создайте директорию
message-bus-lab/. - Внутри создайте docker-compose.yml для NATS (альтернативный вариант — Kafka — выберите один, но опишите оба).
# docker-compose.yml (NATS)
version: '3.8'
services:
nats:
image: nats:2.10-alpine
ports:
- "4222:4222"
- "8222:8222"
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222"
producer:
build: ./agents
depends_on:
- nats
environment:
- NATS_URL=nats://nats:4222
- AGENT_ROLE=producer
restart: "no"
# запуск через docker compose run --rm
consumer:
build: ./agents
depends_on:
- nats
environment:
- NATS_URL=nats://nats:4222
- AGENT_ROLE=consumer
restart: "no"
- Создайте
agents/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]
- Создайте
agents/requirements.txt:
nats-py>=2.8.0
- Поднимите стек: docker compose up -d nats. Проверьте, что порт 8222 (HTTP мониторинг) отвечает: curl http://localhost:8222 — должно вернуть JSON со статусом.
Ожидаемый результат этапа Работающий NATS в Docker Compose, доступный на localhost:4222, отдающий статус на порту 8222.
Этап 2: Написание агента-производителя (producer) (30 минут)
Действия
- Внутри
agents/создайтеmain.pyс точкой входа. - Реализуйте логику: producer подключается к NATS, каждые 2 секунды публикует сообщение JSON вида:
{
"agent_id": "producer-1",
"event": "processing_request",
"payload": {
"document_id": "doc_xxxx",
"timestamp": "2025-04-08T10:00:00Z"
}
}
import asyncio
import json
import os
import uuid
from datetime import datetime, timezone
import nats
from nats.js import JetStreamContext
NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222")
STREAM_NAME = "agent_events"
SUBJECT = "agent.request"
async def run():
nc = await nats.connect(NATS_URL)
js: JetStreamContext = nc.jetstream()
# Создаём стрим (если нет)
try:
await js.add_stream(name=STREAM_NAME, subjects=[SUBJECT])
except nats.js.errors.InvalidStreamNameError:
pass # уже существует
while True:
msg = {
"agent_id": "producer-1",
"event": "processing_request",
"payload": {
"document_id": f"doc_{uuid.uuid4().hex[:8]}",
"timestamp": datetime.now(timezone.utc).isoformat()
}
}
ack = await js.publish(SUBJECT, json.dumps(msg).encode())
print(f"Published, seq={ack.seq}")
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(run())
Ожидаемый результат этапа Продюсер каждые 2 секунды публикует сообщения в стрим agent_events, видно в логах последовательные seq.
Этап 3: Написание агента-потребителя (consumer) (30 минут)
Действия
- Добавьте в
main.pyвторую ветвь по переменной AGENT_ROLE=consumer. - Consumer подписывается на durable pull-подписку (чтобы сообщения не терялись при отключении).
- При получении сообщения выводит его в лог и acknowledges (подтверждает обработку).
elif role == "consumer":
# durable consumer
sub = await js.pull_subscribe(SUBJECT, durable="agent-consumer-1")
while True:
try:
msgs = await sub.fetch(batch=1, timeout=10)
for msg in msgs:
data = json.loads(msg.data.decode())
print(f"Processed: {data['payload']['document_id']}")
await msg.ack()
except nats.errors.TimeoutError:
pass
- Запустите два экземпляра consumer в разных окнах/контейнерах (с одинаковым durable name — тогда NATS будет балансировать нагрузку, но если один упадёт, второй заберёт его сообщения).
- Убедитесь, что каждое опубликованное сообщение обрабатывается ровно одним consumer (круговая балансировка).
Ожидаемый результат этапа Consumer получает и подтверждает все сообщения. В логах видна обработка без дублирования.
Этап 4: Проверка гарантии доставки (20 минут)
Действия
-
Тест 1: Временное отсутствие потребителя
-
Тест 2: Падение consumer
- Запустите consumer, через 5 секунд принудительно завершите его (
Ctrl+C). Через 10 секунд перезапустите. - Убедитесь, что сообщения, опубликованные во время "даунтайма", не потеряны.
- Запустите consumer, через 5 секунд принудительно завершите его (
-
Тест 3: Конкуренция consumer'ов
- Запустите два consumer с разными durable именами (или одинаковыми — для балансировки).
- Проверьте, что каждое сообщение обрабатывается ровно один раз (не дублируется и не теряется).
Для проверки используйте мониторинг NATS
# Зайдите в контейнер nats и выполните
docker compose exec nats nats stream list
docker compose exec nats nats stream info agent_events
Видно количество сообщений, подтверждённых и ожидающих.
Ожидаемый результат этапа Все сообщения доставлены, подтверждены, метрики стрима показывают отсутствие потерь.
Этап 5: Подготовка отчёта и рефакторинг (10 минут)
Действия
-
Создайте простой скрипт
test_delivery.py, который:- Запускает стрим/консьюмер через subprocess
- Публикует 100 сообщений
- Ждёт, пока все будут подтверждены
- Сравнивает количество опубликованных и обработанных, падает если не равно.
-
Документируйте архитектуру в README.md:
Ожидаемый результат этапа Законченный проект в message-bus-lab/, готовый к повторному развёртыванию.
5. Критерии приемки (Definition of Done)
- Docker Compose запускает NATS (или Kafka) и два сервиса-агента.
- Producer публикует сообщения с уникальным ID (document_id).
- Consumer получает и acknowledges все сообщения.
- При временном отключении consumer сообщения не теряются (накапливаются в стриме).
- При запуске второго consumer сообщения балансируются между ними (round-robin).
- Метрики (количество сообщений в стриме, ack rate) можно посмотреть через NATS CLI.
- Реализован автоматический тест, который проверяет потерю < 0.1% (допускается одно дублирование при рестарте).
- Код поддерживает переключение на Kafka (через env
BROKER_TYPE=kafka— опционально, достаточно NATS по умолчанию).
6. Ожидаемый результат
Основной артефакт — директория message-bus-lab/ со следующим содержимым:
message-bus-lab/
├── docker-compose.yml # Описание инфраструктуры
├── agents/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── main.py # Producer + Consumer (переключение по роли)
│ └── test_delivery.py # Автоматический тест доставки
└── README.md # Инструкция и схема
Содержание README
- Краткое описание задачи
- Как запустить (
docker compose up -d nats, затемdocker compose run --rm consumerиdocker compose run --rm producer) - Результаты тестов
- Вывод: какие гарантии даёт NATS JetStream и как их проверить.
Дополнительные результаты (опционально):
- Лог-файл с подтверждениями последовательностей seq.
- Grafana-дашборд для мониторинга (если развернуты метрики NATS через Prometheus).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| NATS не стартует, порт занят | Сменить порт в docker-compose (например, 4223:4222) |
| Consumer не получает старые сообщения | Проверить durable имя: если указан durable="agent-consumer-1" и consumer не запущен, NATS сохраняет сообщения для этого durable-подписчика. При повторном запуске он получит backlog. |
| Сообщения дублируются при перезапуске consumer | Включить дедупликацию на стороне producer (поле Nats-Msg-Id). JetStream гарантирует Exactly-Once для одного producer. |
| Kafka — сложнее конфигурация, топики, партиции | Используйте NATS как основной вариант. Для Kafka потребуется дополнительный файл docker-compose-kafka.yml и настройка KAFKA_BOOTSTRAP_SERVERS. |
| Python-клиент зависает при долгом отсутствии NATS | Добавить таймауты и повторные попытки: await nats.connect(NATS_URL, connect_timeout=5, reconnect_time_wait=2) |
8. Бюджет времени (оценка)
| Этап | Время (минут) |
|---|---|
| 1. Развёртывание message broker | 20 |
| 2. Написание producer | 30 |
| 3. Написание consumer | 30 |
| 4. Проверка гарантии доставки | 20 |
| 5. Подготовка отчёта и тест | 10 |
| Итого | 110 минут (≈ 2 часа) |
Примечание для первого раза Если вы впервые работаете с NATS, добавьте 20-30 минут на изучение документации и отладку.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 17 | Как реализовать обмен сообщениями между агентами? (основы) |
| 32 | JetStream vs Core NATS — когда что использовать? |
| 45 | Kafka consumer groups и rebalance |
| 108 | Обеспечение exactly-once delivery в message bus |
| 134 | Мониторинг брокера сообщений (NATS CLI, Prometheus) |
| 171 | Паттерн Request-Reply через брокер |
| 218 | Dead letter queue и обработка ошибок |
| 289 | Сериализация сообщений (JSON, Protobuf, Avro) |
| 345 | Сравнение NATS, RabbitMQ, Kafka для микросервисов |
| 401 | Rate limiting и backpressure в асинхронных очередях |
10. Чек-лист самопроверки
- Я развернул Docker Compose с NATS (или Kafka) и запустил консюмера/продюсера.
- Я проверил, что при отключении консюмера сообщения накапливаются и приходят после переподключения.
- Я написал автоматический тест, который публикует 100 сообщений и ждёт подтверждения всех от консюмера.
- Я посмотрел
nats stream info agent_eventsи убедился, чтоmessages=acknowledged. - Я протестировал ситуацию с двумя консюмерами — каждое сообщение обработано ровно один раз.
- Я задокументировал архитектуру и команды запуска в README.