Настроить exactly-once delivery
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить exactly-once delivery
1. Цель задачи
Научиться гарантировать ровно однократную доставку (exactly-once) сообщений между агентами в распределённой системе. Реализовать идемпотентный consumer с механизмом дедупликации, устойчивый к повторным попыткам (retries) со стороны producer. Фиксить дубликаты при сбоях сети, перезапусках consumer и ребалансировке партиций.
Ключевой результат consumer не обрабатывает одно и то же сообщение дважды, даже если producer шлёт повторы; число дубликатов на выходе равно 0 при ретраях.
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Рабочая очередь сообщений (Kafka / RabbitMQ / NATS) | Docker Compose или установленный инстанс |
| Producer, генерирующий сообщения с уникальными ID | Написать самостоятельно (готовый скрипт) |
| Consumer, который может падать и перезапускаться | Заготовка из предыдущих практик (или написать) |
| Redis (или другое kv-хранилище) для хранения обработанных ID | Docker образ redis:7 или valkey |
| Инструмент для генерации нагрузки и симуляции сбоев | kafka-producer-perf-test, curl, bash-скрипты |
| Метрики количества обработанных сообщений | Prometheus + Grafana (опционально) или локальный счётчик |
Если нет реального инструмента — симулируем:
- Ручная эмуляция дубликатов запустите producer, который отправляет 10 сообщений с одинаковым
msg_id. Убедитесь, что consumer обрабатывает только первое. - Симуляция падений consumer напишите скрипт, который убивает процесс consumer (SIGKILL) после обработки каждого 5-го сообщения, а затем перезапускает его. Проверьте, что при повторном старте сообщения, уже обработанные до падения, не обрабатываются заново.
- Имитация ретраев producer в коде producer добавьте случайные повторные отправки одного и того же
msg_idс интервалом 200–500 мс. - В качестве kv-хранилища можно использовать SQLite с таблицей
processed_id, если Redis недоступен.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Брокер сообщений | Apache Kafka 3.7+ (или Redpanda) | Транспорт сообщений между агентами |
| Consumer / Producer | Python 3.11 + kafka-python / confluent-kafka | Реализация логики приёма и отправки |
| Хранилище ID | Redis 7 (или Valkey) / SQLite | Дедупликация: хранение уникальных msg_id |
| Мониторинг | Prometheus + Grafana (опционально) | Метрики (total_processed, duplicates) |
| Контейнеризация | Docker + docker-compose | Поднятие инфраструктуры (Kafka, Redis) |
| Тестирование | unittest / pytest, bash-скрипты | Проверка отсутствия дубликатов |
4. Этапы выполнения
Этап 1: Подготовка стенда и симуляция дубликатов (1 час 30 минут)
Действия
-
Разверните локальный стек с помощью docker-compose.yml:
- Kafka (1 брокер) и Redis (1 инстанс).
- Пример конфигурации (упрощённый) в файле docker-compose.yml:
version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:7.5.0 ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 redis: image: redis:7 ports: - "6379:6379" - Запустите: docker-compose up -d.
-
Напишите простой producer (producer.py), который отправляет 100 сообщений в топик
orders. Каждое сообщение содержит JSON с уникальнымmsg_id(например, UUID4) и payload. Важно добавить возможность повторной отправки одного и того жеmsg_id(задать вероятность 0.3 на сообщение). Код:import json, uuid, time, random from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode()) for i in range(100): msg_id = str(uuid.uuid4()) payload = {"order_id": i, "item": "book"} # Имитируем дубликат с вероятностью 30% if random.random() < 0.3: producer.send('orders', value={"msg_id": msg_id, "payload": payload}) time.sleep(0.05) producer.send('orders', value={"msg_id": msg_id, "payload": payload}) time.sleep(0.1) producer.flush() -
Напишите простой consumer без дедупликации (
consumer_raw.py), который просто выводитmsg_idи имитирует обработку (sleep 0.2). Запустите его и посчитайте количество обработанных уникальных ID — увидите дубликаты.
Ожидаемый результат этапа Работающий Kafka + Redis, скрипты producer с дубликатами, consumer показывает повторную обработку одних и тех же msg_id. Зафиксируйте количество уникальных сообщений (N) и количество вызовов consumer (M > N).
Этап 2: Реализация идемпотентного consumer с дедупликацией через Redis (2 часа)
Действия
-
Создайте
consumer_dedup.py:- При получении сообщения сначала проверяйте наличие
msg_idв Redis:import redis r = redis.Redis(host='localhost', port=6379, decode_responses=True) # Используем SETNX (атомарная установка, если ключа нет) if r.setnx(f"processed:{msg_id}", "1"): # не было ранее — обрабатываем process_message(message) # устанавливаем TTL (например, 1 час) для очистки r.expire(f"processed:{msg_id}", 3600) else: # дубликат — пропускаем logger.info(f"Duplicate detected, skipping {msg_id}") consumer.commit() # важно закоммитить смещение, чтобы не зависнуть - Используйте ручной коммит (
enable_auto_commit=False), чтобы фиксировать offset только после успешной обработки и записи в Redis.
- При получении сообщения сначала проверяйте наличие
-
Обеспечьте идемпотентность обработки:
- Сама функция
process_messageдолжна быть идемпотентной (например, запись в БД через INSERT ... ON CONFLICT DO NOTHING). В рамках задачи достаточно проверить, что повторное выполнение не меняет внешнее состояние.
- Сама функция
-
Добавьте обработку исключений:
-
Напишите тестовый сценарий:
- Запустите producer (с дубликатами). Запустите
consumer_dedup.py. Убедитесь, что количество уникальных обработанных сообщений равно числу уникальныхmsg_id(т.е. дубликаты отброшены). Выведите счётчики в лог.
- Запустите producer (с дубликатами). Запустите
Ожидаемый результат этапа Consumer отбрасывает дубликаты на основе Redis. Логи содержат строки Duplicate detected, skipping ... ровно столько раз, сколько было дубликатов.
Этап 3: Тестирование устойчивости к рестартам и ретраям (1 час 30 минут)
Действия
-
Симуляция падения consumer:
- С помощью kill -9 <pid> (или docker kill) внезапно прервите consumer после обработки 10–15 сообщений.
- Перезапустите consumer. Убедитесь, что он не переобрабатывает уже закоммиченные сообщения (offset сохранён в Kafka) и что сообщения, обработанные до убийства, не повторяются (Redis хранит их ID).
- Важно если offset не был закоммичен (из-за асинхронного коммита), consumer может повторно получить те же сообщения — Redis защитит.
-
Ретрансляция сообщений producer:
-
Проверка параллельной обработки:
Ожидаемый результат этапа При любых сценариях (падение, ретраи, ребалансировка) количество дубликатов на выходе = 0. Напишите bash-скрипт test_exactly_once.sh, который прогоняет все сценарии и выводит "PASS" / "FAIL".
Этап 4: Интеграция метрик и мониторинга (1 час)
Действия
-
Добавьте Prometheus метрики в consumer (
prometheus_client):total_messages_received— счётчик полученных сообщений.duplicates_detected— счётчик отброшенных дубликатов.processed_messages— счётчик уникальных обработанных сообщений.redis_errors— счётчик ошибок Redis.
-
Запустите Prometheus + Grafana (через
docker-compose): -
Проведите стресс-тест:
- Producer отправляет 10 000 сообщений с 50% дубликатов.
- Убедитесь, что метрики показывают правильные значения, а дашборд отражает 0 дубликатов на выходе.
Ожидаемый результат этапа Работающий дашборд Grafana, который позволяет в реальном времени видеть, что exactly-once доставка работает.
Этап 5: Документирование и оформление результатов (30 минут)
Действия
-
Напишите README.md, описывающий архитектуру:
-
Зафиксируйте метрики успеха:
-
Соберите все файлы в репозиторий:
exactly-once-delivery/ ├── docker-compose.yml ├── producer.py ├── consumer_dedup.py ├── consumer_raw.py ├── test_exactly_once.sh ├── README.md └── screenshots/
Ожидаемый результат этапа Чистый репозиторий с кодом, документацией и подтверждением работы.
5. Критерии приемки (Definition of Done)
- Consumer не обрабатывает сообщение дважды при повторной отправке producer (ретраи).
- После внезапного падения consumer (kill -9) и перезапуска — дубликатов нет.
- В параллельном режиме (несколько consumer в одной группе) — дубликатов нет.
- Redis хранит ID обработанных сообщений; при TTL ключа после окончания срока действия дубликаты не появляются (проверить на сообщениях, отправленных с интервалом > TTL).
- Метрики Prometheus доступны; дашборд Grafana показывает
duplicates_detected = 0при наличии дублирующих сообщений. - Тестовый скрипт (
test_exactly_once.sh) возвращает 0 и выводит "PASS" для всех сценариев. - Код протестирован на Python 3.11, все зависимости указаны в
requirements.txt. - Документация объясняет ограничения (например, потеря ключей Redis при сбое — возможна повторная обработка).
- Время выполнения одного сообщения с учётом дедупликации не превышает 500 мс при нагрузке 50 msg/s.
6. Ожидаемый результат
Основной артефакт папка с проектом exactly-once-delivery/, содержащая:
docker-compose.ymlдля Kafka + Redis + Prometheus + Grafana.producer.py— producer с имитацией дубликатов.consumer_dedup.py— идемпотентный consumer с Redis-дедупликацией.test_exactly_once.sh— автоматизированный тест.README.mdс архитектурой, инструкцией по запуску и результатами.requirements.txtс зависимостями (kafka-python,redis,prometheus_client).
Дополнительные результаты
- Дашборд Grafana с метриками (JSON export).
- Лог-файлы с демонстрацией работы (на усмотрение).
- Анализ trade-offs: почему выбран Redis, а не in-memory или база данных.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Redis может потерять данные при перезапуске (если не настроен AOF/RDB) | Используйте RDB (сохранение раз в 5 минут) или кластер. В тестах допустимо, но в production — Redis с persistence. |
| Коммит offset до записи в Redis (consumer упал между обработкой и записью) | Всегда записывайте в Redis перед коммитом. Если запись не удалась — не коммитить. |
| Два consumer могут одновременно получить одно и то же сообщение (при ребалансировке) | Используйте атомарную операцию SETNX в Redis (только один consumer установит ключ). |
| TTL ключа может привести к повторной обработке после удаления | Установите TTL больше максимального предполагаемого времени между retries (например, 24 часа). Для тестов — 10 минут. |
| Рост количества ключей в Redis | Используйте Redis с политикой вытеснения allkeys-lru или периодическую очистку по TTL. |
Producer отправляет дубликаты с разными msg_id (непреднамеренно) | На уровне приложения ввести бизнес-ключ (например, order_id), и дедуплицировать по нему. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка стенда и симуляция дубликатов | 1 ч 30 мин |
| Этап 2: Реализация идемпотентного consumer | 2 ч |
| Этап 3: Тестирование устойчивости к рестартам | 1 ч 30 мин |
| Этап 4: Интеграция метрик и мониторинга | 1 ч |
| Этап 5: Документирование | 30 мин |
| Итого | 6 ч 30 мин |
Примечание для первого раза Если вы впервые работаете с Kafka и Redis, заложите дополнительно 1–2 часа на отладку конфигурации Kafka и настройку Prometheus.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 42 | Идемпотентные операции в REST API |
| 57 | Обработка ошибок в распределённых системах (retries, backoff) |
| 88 | Гарантии доставки сообщений (at-most-once, at-least-once, exactly-once) |
| 103 | Kafka: ручное управление offset (manual commit) |
| 144 | Redis: атомарные операции SETNX и блокировки |
| 201 | Паттерн "Transactional Outbox" |
| 256 | Prometheus: кастомные метрики для приложений (Gauge, Counter) |
| 310 | Тестирование устойчивости consumer при ребалансировке партиций |
| 421 | CQRS и Event Sourcing: роль дедупликации |
| 519 | Dead Letter Queue и повторная обработка неудачных сообщений |
10. Чек-лист самопроверки
- Я настроил docker-compose и убедился, что Kafka и Redis доступны.
- Я написал producer с намеренными дубликатами и проверил, что без дедупликации количество обработанных сообщений больше уникальных.
- Я реализовал consumer с
setnxи ручным коммитом offset. - Я протестировал kill -9 consumer и перезапуск — дубликаты не появились.
- Я запустил два экземпляра consumer в одной группе — ни одно сообщение не обработано дважды.
- Я добавил Prometheus метрики и подключил Grafana.
- Я запустил
test_exactly_once.shи получил PASS по всем сценариям. - Я задокументировал архитектуру, ограничения и команды для повторения.
- Я зафиксировал скриншот дашборда с нулевым количеством дубликатов.