Настроить exactly-once delivery

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить exactly-once delivery

1. Цель задачи

Научиться гарантировать ровно однократную доставку (exactly-once) сообщений между агентами в распределённой системе. Реализовать идемпотентный consumer с механизмом дедупликации, устойчивый к повторным попыткам (retries) со стороны producer. Фиксить дубликаты при сбоях сети, перезапусках consumer и ребалансировке партиций.

Ключевой результат consumer не обрабатывает одно и то же сообщение дважды, даже если producer шлёт повторы; число дубликатов на выходе равно 0 при ретраях.


2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Рабочая очередь сообщений (Kafka / RabbitMQ / NATS)Docker Compose или установленный инстанс
Producer, генерирующий сообщения с уникальными IDНаписать самостоятельно (готовый скрипт)
Consumer, который может падать и перезапускатьсяЗаготовка из предыдущих практик (или написать)
Redis (или другое kv-хранилище) для хранения обработанных IDDocker образ redis:7 или valkey
Инструмент для генерации нагрузки и симуляции сбоевkafka-producer-perf-test, curl, bash-скрипты
Метрики количества обработанных сообщенийPrometheus + Grafana (опционально) или локальный счётчик

Если нет реального инструмента — симулируем:

  1. Ручная эмуляция дубликатов запустите producer, который отправляет 10 сообщений с одинаковым msg_id. Убедитесь, что consumer обрабатывает только первое.
  2. Симуляция падений consumer напишите скрипт, который убивает процесс consumer (SIGKILL) после обработки каждого 5-го сообщения, а затем перезапускает его. Проверьте, что при повторном старте сообщения, уже обработанные до падения, не обрабатываются заново.
  3. Имитация ретраев producer в коде producer добавьте случайные повторные отправки одного и того же msg_id с интервалом 200–500 мс.
  4. В качестве kv-хранилища можно использовать SQLite с таблицей processed_id, если Redis недоступен.

3. Технологический стек

КомпонентИнструментыНазначение
Брокер сообщенийApache Kafka 3.7+ (или Redpanda)Транспорт сообщений между агентами
Consumer / ProducerPython 3.11 + kafka-python / confluent-kafkaРеализация логики приёма и отправки
Хранилище IDRedis 7 (или Valkey) / SQLiteДедупликация: хранение уникальных msg_id
МониторингPrometheus + Grafana (опционально)Метрики (total_processed, duplicates)
КонтейнеризацияDocker + docker-composeПоднятие инфраструктуры (Kafka, Redis)
Тестированиеunittest / pytest, bash-скриптыПроверка отсутствия дубликатов

4. Этапы выполнения

Этап 1: Подготовка стенда и симуляция дубликатов (1 час 30 минут)

Действия

  1. Разверните локальный стек с помощью docker-compose.yml:

    • Kafka (1 брокер) и Redis (1 инстанс).
    • Пример конфигурации (упрощённый) в файле docker-compose.yml:
      version: '3.8'
      services:
        zookeeper:
          image: confluentinc/cp-zookeeper:7.5.0
          environment:
            ZOOKEEPER_CLIENT_PORT: 2181
        kafka:
          image: confluentinc/cp-kafka:7.5.0
          ports:
            - "9092:9092"
          environment:
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        redis:
          image: redis:7
          ports:
            - "6379:6379"
      
    • Запустите: docker-compose up -d.
  2. Напишите простой producer (producer.py), который отправляет 100 сообщений в топик orders. Каждое сообщение содержит JSON с уникальным msg_id (например, UUID4) и payload. Важно добавить возможность повторной отправки одного и того же msg_id (задать вероятность 0.3 на сообщение). Код:

    import json, uuid, time, random
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda v: json.dumps(v).encode())
    for i in range(100):
        msg_id = str(uuid.uuid4())
        payload = {"order_id": i, "item": "book"}
        # Имитируем дубликат с вероятностью 30%
        if random.random() < 0.3:
            producer.send('orders', value={"msg_id": msg_id, "payload": payload})
            time.sleep(0.05)
        producer.send('orders', value={"msg_id": msg_id, "payload": payload})
        time.sleep(0.1)
    producer.flush()
    
  3. Напишите простой consumer без дедупликации (consumer_raw.py), который просто выводит msg_id и имитирует обработку (sleep 0.2). Запустите его и посчитайте количество обработанных уникальных ID — увидите дубликаты.

Ожидаемый результат этапа Работающий Kafka + Redis, скрипты producer с дубликатами, consumer показывает повторную обработку одних и тех же msg_id. Зафиксируйте количество уникальных сообщений (N) и количество вызовов consumer (M > N).

Этап 2: Реализация идемпотентного consumer с дедупликацией через Redis (2 часа)

Действия

  1. Создайте consumer_dedup.py:

    • При получении сообщения сначала проверяйте наличие msg_id в Redis:
      import redis
      r = redis.Redis(host='localhost', port=6379, decode_responses=True)
      # Используем SETNX (атомарная установка, если ключа нет)
      if r.setnx(f"processed:{msg_id}", "1"):
          # не было ранее — обрабатываем
          process_message(message)
          # устанавливаем TTL (например, 1 час) для очистки
          r.expire(f"processed:{msg_id}", 3600)
      else:
          # дубликат — пропускаем
          logger.info(f"Duplicate detected, skipping {msg_id}")
          consumer.commit()  # важно закоммитить смещение, чтобы не зависнуть
      
    • Используйте ручной коммит (enable_auto_commit=False), чтобы фиксировать offset только после успешной обработки и записи в Redis.
  2. Обеспечьте идемпотентность обработки:

    • Сама функция process_message должна быть идемпотентной (например, запись в БД через INSERT ... ON CONFLICT DO NOTHING). В рамках задачи достаточно проверить, что повторное выполнение не меняет внешнее состояние.
  3. Добавьте обработку исключений:

    • Если Redis недоступен, consumer должен упасть или повторить попытку (не коммитить offset), чтобы избежать потери сообщений.
    • Реализуйте повторные попытки с экспоненциальной задержкой (max 3 раза), затем логировать ошибку.
  4. Напишите тестовый сценарий:

    • Запустите producer (с дубликатами). Запустите consumer_dedup.py. Убедитесь, что количество уникальных обработанных сообщений равно числу уникальных msg_id (т.е. дубликаты отброшены). Выведите счётчики в лог.

Ожидаемый результат этапа Consumer отбрасывает дубликаты на основе Redis. Логи содержат строки Duplicate detected, skipping ... ровно столько раз, сколько было дубликатов.

Этап 3: Тестирование устойчивости к рестартам и ретраям (1 час 30 минут)

Действия

  1. Симуляция падения consumer:

    • С помощью kill -9 <pid> (или docker kill) внезапно прервите consumer после обработки 10–15 сообщений.
    • Перезапустите consumer. Убедитесь, что он не переобрабатывает уже закоммиченные сообщения (offset сохранён в Kafka) и что сообщения, обработанные до убийства, не повторяются (Redis хранит их ID).
    • Важно если offset не был закоммичен (из-за асинхронного коммита), consumer может повторно получить те же сообщения — Redis защитит.
  2. Ретрансляция сообщений producer:

    • Измените producer, чтобы он отправлял одно и то же сообщение 3 раза подряд с интервалом 1 секунда.
    • Проверьте, что consumer обрабатывает только первое вхождение.
  3. Проверка параллельной обработки:

    • Запустите 2 экземпляра consumer в одной группе (group.id="order-group"). Убедитесь, что каждый consumer корректно проверяет Redis (общий Redis) — ни одно сообщение не обработано дважды даже при перераспределении партиций.

Ожидаемый результат этапа При любых сценариях (падение, ретраи, ребалансировка) количество дубликатов на выходе = 0. Напишите bash-скрипт test_exactly_once.sh, который прогоняет все сценарии и выводит "PASS" / "FAIL".

Этап 4: Интеграция метрик и мониторинга (1 час)

Действия

  1. Добавьте Prometheus метрики в consumer (prometheus_client):

    • total_messages_received — счётчик полученных сообщений.
    • duplicates_detected — счётчик отброшенных дубликатов.
    • processed_messages — счётчик уникальных обработанных сообщений.
    • redis_errors — счётчик ошибок Redis.
  2. Запустите Prometheus + Grafana (через docker-compose):

    • Настройте сбор метрик с consumer (адрес: localhost:8000).
    • Создайте дашборд с тремя панелями:
      • Скорость обработки (processed_messages / sec).
      • Доля дубликатов (duplicates_detected / total_messages_received).
      • Ошибки Redis.
  3. Проведите стресс-тест:

    • Producer отправляет 10 000 сообщений с 50% дубликатов.
    • Убедитесь, что метрики показывают правильные значения, а дашборд отражает 0 дубликатов на выходе.

Ожидаемый результат этапа Работающий дашборд Grafana, который позволяет в реальном времени видеть, что exactly-once доставка работает.

Этап 5: Документирование и оформление результатов (30 минут)

Действия

  1. Напишите README.md, описывающий архитектуру:

    • Как работает идемпотентный consumer (Redis + manual commit).
    • Как тестировать (запуск скриптов).
    • Ограничения (TTL ключей, необходимость синхронизации часов, потеря ключей при сбое Redis).
  2. Зафиксируйте метрики успеха:

    • Скриншот дашборда Grafana с отсутствием дубликатов.
    • Вывод тестового скрипта test_exactly_once.sh с PASS.
  3. Соберите все файлы в репозиторий:

    exactly-once-delivery/
    ├── docker-compose.yml
    ├── producer.py
    ├── consumer_dedup.py
    ├── consumer_raw.py
    ├── test_exactly_once.sh
    ├── README.md
    └── screenshots/
    

Ожидаемый результат этапа Чистый репозиторий с кодом, документацией и подтверждением работы.


5. Критерии приемки (Definition of Done)

  • Consumer не обрабатывает сообщение дважды при повторной отправке producer (ретраи).
  • После внезапного падения consumer (kill -9) и перезапуска — дубликатов нет.
  • В параллельном режиме (несколько consumer в одной группе) — дубликатов нет.
  • Redis хранит ID обработанных сообщений; при TTL ключа после окончания срока действия дубликаты не появляются (проверить на сообщениях, отправленных с интервалом > TTL).
  • Метрики Prometheus доступны; дашборд Grafana показывает duplicates_detected = 0 при наличии дублирующих сообщений.
  • Тестовый скрипт (test_exactly_once.sh) возвращает 0 и выводит "PASS" для всех сценариев.
  • Код протестирован на Python 3.11, все зависимости указаны в requirements.txt.
  • Документация объясняет ограничения (например, потеря ключей Redis при сбое — возможна повторная обработка).
  • Время выполнения одного сообщения с учётом дедупликации не превышает 500 мс при нагрузке 50 msg/s.

6. Ожидаемый результат

Основной артефакт папка с проектом exactly-once-delivery/, содержащая:

  • docker-compose.yml для Kafka + Redis + Prometheus + Grafana.
  • producer.pyproducer с имитацией дубликатов.
  • consumer_dedup.py — идемпотентный consumer с Redis-дедупликацией.
  • test_exactly_once.sh — автоматизированный тест.
  • README.md с архитектурой, инструкцией по запуску и результатами.
  • requirements.txt с зависимостями (kafka-python, redis, prometheus_client).

Дополнительные результаты

  • Дашборд Grafana с метриками (JSON export).
  • Лог-файлы с демонстрацией работы (на усмотрение).
  • Анализ trade-offs: почему выбран Redis, а не in-memory или база данных.

7. Возможные сложности и их решение

СложностьРешение
Redis может потерять данные при перезапуске (если не настроен AOF/RDB)Используйте RDB (сохранение раз в 5 минут) или кластер. В тестах допустимо, но в productionRedis с persistence.
Коммит offset до записи в Redis (consumer упал между обработкой и записью)Всегда записывайте в Redis перед коммитом. Если запись не удалась — не коммитить.
Два consumer могут одновременно получить одно и то же сообщение (при ребалансировке)Используйте атомарную операцию SETNX в Redis (только один consumer установит ключ).
TTL ключа может привести к повторной обработке после удаленияУстановите TTL больше максимального предполагаемого времени между retries (например, 24 часа). Для тестов — 10 минут.
Рост количества ключей в RedisИспользуйте Redis с политикой вытеснения allkeys-lru или периодическую очистку по TTL.
Producer отправляет дубликаты с разными msg_id (непреднамеренно)На уровне приложения ввести бизнес-ключ (например, order_id), и дедуплицировать по нему.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка стенда и симуляция дубликатов1 ч 30 мин
Этап 2: Реализация идемпотентного consumer2 ч
Этап 3: Тестирование устойчивости к рестартам1 ч 30 мин
Этап 4: Интеграция метрик и мониторинга1 ч
Этап 5: Документирование30 мин
Итого6 ч 30 мин

Примечание для первого раза Если вы впервые работаете с Kafka и Redis, заложите дополнительно 1–2 часа на отладку конфигурации Kafka и настройку Prometheus.


9. Связанные вопросы из базы знаний

ВопросТема
42Идемпотентные операции в REST API
57Обработка ошибок в распределённых системах (retries, backoff)
88Гарантии доставки сообщений (at-most-once, at-least-once, exactly-once)
103Kafka: ручное управление offset (manual commit)
144Redis: атомарные операции SETNX и блокировки
201Паттерн "Transactional Outbox"
256Prometheus: кастомные метрики для приложений (Gauge, Counter)
310Тестирование устойчивости consumer при ребалансировке партиций
421CQRS и Event Sourcing: роль дедупликации
519Dead Letter Queue и повторная обработка неудачных сообщений

10. Чек-лист самопроверки

  • Я настроил docker-compose и убедился, что Kafka и Redis доступны.
  • Я написал producer с намеренными дубликатами и проверил, что без дедупликации количество обработанных сообщений больше уникальных.
  • Я реализовал consumer с setnx и ручным коммитом offset.
  • Я протестировал kill -9 consumer и перезапуск — дубликаты не появились.
  • Я запустил два экземпляра consumer в одной группе — ни одно сообщение не обработано дважды.
  • Я добавил Prometheus метрики и подключил Grafana.
  • Я запустил test_exactly_once.sh и получил PASS по всем сценариям.
  • Я задокументировал архитектуру, ограничения и команды для повторения.
  • Я зафиксировал скриншот дашборда с нулевым количеством дубликатов.