Настроить distributed DLQ для failed инференса

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить distributed DLQ для failed инференса

1. Цель задачи

Научиться проектировать отказоустойчивую обработку сообщений инференса в распределённой системе на базе Apache Kafka. Реализовать pipeline: input-топик → consumer с retry-логикой (1s, 10s, 1m) → DLQ-топик для сообщений, не поддающихся обработке. Дополнительно создать механизм ручного reprocess сообщений из DLQ обратно в input-топик.

Ключевой результат Работающий прототип Kafka-пайплайна с тремя уровнями retry, DLQ и скриптом ручного восстановления.


2. Исходные данные

Что нужноОткуда взять
Kafka-кластер (локальный/test)docker-compose (confluentinc/cp-kafka)
Инференс-сервис (заглушка)Python FastAPI + синтетические запросы
Клиентские библиотекиconfluent-kafka-python / kafka-python
Docker + Docker ComposeУстановить самостоятельно

Если нет реального инструмента — симулируем:

  1. Запустите Kafka в Docker одной командой:
    version: '3'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on: [zookeeper]
        ports:
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
  2. Используйте скрипт-генератор сообщений (Python) для эмуляции вызовов инференса.

3. Технологический стек

КомпонентИнструментыНазначение
Брокер сообщенийApache Kafka (Confluent)Промежуточное хранение и маршрутизация
Клиентская библиотекаconfluent-kafka-pythonConsumer/Producer с поддержкой idempotence
Инференс-сервис (mock)Python FastAPIЗаглушка, возвращающая success/failure
ОркестрацияDocker & Docker ComposeУпрощённый запуск Kafka
ОбработкаPython asyncio / threadingАсинхронный consumer с retry
Утилитыcurl, kcat (kafkacat)Ручное тестирование топиков

4. Этапы выполнения

Этап 1: Настройка Kafka-топиков и инфраструктуры (30 мин)

Действия

  1. Создайте топики с помощью kafka-topics.sh (или через Docker exec):
    # input – основные сообщения инференса
    docker exec kafka kafka-topics --create --topic input --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    # retry-1s, retry-10s, retry-1m – очереди повторных попыток
    for delay in 1s 10s 1m; do
      docker exec kafka kafka-topics --create --topic "retry-${delay}" --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    done
    # dlq – мёртвые сообщения
    docker exec kafka kafka-topics --create --topic dlq --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config cleanup.policy=compact
    
  2. Настройте consumer group inference-group для input-топика (будет создана автоматически).
  3. Проверьте доступность топиков:
    docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
    

Ожидаемый результат этапа 5 топиков созданы, consumer может подключаться.


Этап 2: Реализация consumer с retry-логикой (1.5 ч)

Действия

  1. Напишите класс InferenceConsumer на Python:
    • Подписка на input (основной поток) и на все retry-* топики (отдельные потоки).
    • При получении сообщения вызывается mock-инференс (функция process_message).
    • Если process_message вернул Truecommit offset.
    • Если False или исключение – определить, сколько retry уже было (хранить в заголовках Kafka).
  2. Логика retry:
    • В заголовках сообщения хранить количество попыток (retry_count) и временную метку последней попытки.
    • При первой ошибке (retry_count=0) отправляем в retry-1s с заголовком scheduled_time = now + 1s.
    • При второй ошибке (retry_count=1) → retry-10s.
    • При третьей (retry_count=2) → retry-1m.
    • После четвёртой (retry_count>=3) → отправляем в dlq с заголовком final_error.
  3. Реализуйте таймерную проверку retry-топиков: consumer с auto.offset.reset=earliest читает, но обрабатывает только те сообщения, чьё scheduled_time <= now. Остальные пропускает (паузирует партицию или откладывает).
  4. Пример кода обработчика:
    def process_message(msg_value):
        # симуляция инференса: 80% успех, 20% ошибка
        import random
        return random.random() < 0.8
    
  5. Используйте Producer для отправки в retry/DLQ:
    producer.produce(
        topic='retry-1s',
        value=msg.value(),
        headers=[('retry_count', str(1)), ('original_timestamp', str(msg.timestamp()[1]))],
        key=msg.key()
    )
    

Ожидаемый результат этапа Сообщения, вызвавшие ошибку, переносятся по цепочке retry-топиков, а после 3 неудач попадают в dlq.


Этап 3: Настройка DLQ и логирование (1 ч)

Действия

  1. Создайте consumer для dlq-топика, который записывает каждое упавшее сообщение в лог (JSON-формат) и в специальную таблицу (опционально – ClickHouse или простой CSV).
  2. Настройте retention для dlq: используйте cleanup.policy=compact, чтобы хранить последнюю версию каждого ключа.
  3. Добавьте метрики: количество сообщений в dlq, среднее число retry, процент успешных обработок. Можно экспортировать через Prometheus client.
  4. Проверьте, что dlq-сообщения содержат полную информацию: исходный payload, время падения, количество попыток, текст ошибки.
  5. Напишите простой скрипт dlq_viewer.py, который выводит последние 10 сообщений из dlq.

Ожидаемый результат этапа Упавшие сообщения логируются и доступны для просмотра.


Этап 4: Реализация ручного reprocess (1.5 ч)

Действия

  1. Создайте REST API (или CLI) для reprocess сообщений из dlq обратно в input.
    • GET /dlq/list – возвращает список сообщений (ключи, время, ошибка).
    • POST /dlq/reprocess – принимает список ключей (или all) и отправляет соответствующие сообщения в input с обнулёнными заголовками retry.
  2. Реализуйте consumer для dlq с ручным управлением offset:
    • При запросе reprocess читаем сообщения из dlq по ключам (используя Kafka Streams или прямой consumer с assign).
    • Отправляем копию в input через producer.
    • Коммитим offset или удаляем сообщение из dlq (альтернатива – tombstone).
  3. CLI-версия (скрипт):
    # reprocess.py --key my_msg_key
    consumer.assign([TopicPartition('dlq', 0)])
    while True:
        msg = consumer.poll(1.0)
        if msg and msg.key() == key:
            producer.produce('input', value=msg.value(), key=msg.key())
            consumer.commit()
            break
    
  4. Добавьте idempotent producer, чтобы избежать дубликатов при повторной отправке.
  5. Протестируйте: отправьте сообщение в input, дождитесь его падения в dlq, затем выполните reprocess и убедитесь, что оно успешно обработалось.

Ожидаемый результат этапа Администратор может вручную вернуть любое сообщение из dlq в обработку.


Этап 5: Интеграционное тестирование (45 мин)

Действия

  1. Запустите все компоненты: Kafka, consumer, dlq-viewer, reprocess API.
  2. Сгенерируйте 100 сообщений с разными ключами (через kafkacat или producer).
  3. Наблюдайте:
    • Сообщения с ошибками должны перейти в retry-1s, затем в retry-10s, retry-1m, dlq.
    • Проверьте тайминги: сообщение в retry-1s должно быть обработано не ранее чем через 1 секунду.
    • Выберите 2 ключа из dlq, выполните reprocess – они должны повторно появиться в input и успешно обработаться.
  4. Проверьте логи: все шаги задокументированы.
  5. Напишите README с инструкцией по запуску.

Ожидаемый результат этапа Пайплайн работает согласно спецификации, все сценарии покрыты тестами.


5. Критерии приемки (Definition of Done)

  • Созданы топики: input, retry-1s, retry-10s, retry-1m, dlq.
  • process_message вызывается для каждого сообщения из input.
  • При ошибке сообщение отправляется в retry-1s с заголовком retry_count=1.
  • После 3 неудачных попыток сообщение попадает в dlq.
  • В dlq хранятся исходный payload и метаданные (количество попыток, ошибка).
  • Реализован ручной reprocess (через REST или CLI) – сообщение возвращается в input с чистым счётчиком.
  • При повторном reprocess того же ключа не возникает дубликатов (idempotent producer/consumer).
  • Все шаги задокументированы в README с примерами команд.

6. Ожидаемый результат

Основной артефакт Папка project/ с содержимым:

  • docker-compose.ymlKafka и, опционально, consumer сервис.
  • consumer.py – основной consumer с retry-логикой.
  • dlq_viewer.py – утилита просмотра dlq.
  • reprocess.py – скрипт ручного reprocess.
  • README.md – инструкция по запуску и тестированию.
  • (Опционально) api.pyFastAPI для reprocess.

Содержание: полностью рабочий прототип распределённого DLQ для инференса.

7. Возможные сложности и их решение

СложностьРешение
Сообщения в retry-топике обрабатываются моментально, а не с задержкойИспользовать scheduled_time в заголовках и пропускать (коммитить без обработки) сообщения, время которых ещё не наступило. Или использовать Kafka Streams suppress().
При ручном reprocess теряется порядок сообщенийНе критично, т.к. reprocess – административная операция. Можно сохранять оригинальный timestamp в заголовках.
Consumer падает во время обработки retryНастроить enable.auto.commit=false и коммитить только после успешной обработки.
Дубликаты при reprocess (если сообщение уже обрабатывается)Использовать idempotent producer (enable.idempotence=true) и уникальный message_id в заголовках.
Потоковое потребление retry-топиков сложно синхронизироватьИспользовать отдельные consumer group для каждого retry-топика, каждый со своим таймером.

8. Бюджет времени (оценка)

ЭтапВремя
1. Настройка Kafka-топиков30 мин
2. Реализация consumer с retry1 ч 30 мин
3. DLQ и логирование1 ч
4. Ручной reprocess1 ч 30 мин
5. Интеграционное тестирование45 мин
Итого5 ч 15 мин

Для первого раза заложите дополнительно 2 часа на отладку и чтение документации.

9. Связанные вопросы из базы знаний

ВопросТема
12Основы Apache Kafka: топики, партиции, consumer groups
34Consumer offset management и auto.commit
56Idempotent producer и exactly-once semantics
78Dead Letter Queue: проектирование и best practices
101Таймерные задержки в Kafka (delayed retry)
203Обработка ошибок в event-driven системах
305Мониторинг Kafka: метрики consumer lag, rate
412Работа с заголовками (headers) в Kafka-сообщениях
518Compacted topics и удаление устаревших сообщений
623Паттерн Saga и компенсирующие транзакции

10. Чек-лист самопроверки

  • Я создал все необходимые топики и проверил их доступность.
  • Мой consumer корректно отличает первую ошибку от повторных (по заголовку retry_count).
  • Задержки между retry соответствуют заданным (1s, 10s, 1m) в пределах погрешности.
  • Я протестировал ручной reprocess на сообщениях из dlq и убедился, что они обрабатываются повторно.
  • В README описаны все команды для запуска и проверки пайплайна.