Настроить distributed DLQ для failed инференса
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить distributed DLQ для failed инференса
1. Цель задачи
Научиться проектировать отказоустойчивую обработку сообщений инференса в распределённой системе на базе Apache Kafka. Реализовать pipeline: input-топик → consumer с retry-логикой (1s, 10s, 1m) → DLQ-топик для сообщений, не поддающихся обработке. Дополнительно создать механизм ручного reprocess сообщений из DLQ обратно в input-топик.
Ключевой результат Работающий прототип Kafka-пайплайна с тремя уровнями retry, DLQ и скриптом ручного восстановления.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Kafka-кластер (локальный/test) | docker-compose (confluentinc/cp-kafka) |
| Инференс-сервис (заглушка) | Python FastAPI + синтетические запросы |
| Клиентские библиотеки | confluent-kafka-python / kafka-python |
| Docker + Docker Compose | Установить самостоятельно |
Если нет реального инструмента — симулируем:
- Запустите Kafka в Docker одной командой:
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:latest depends_on: [zookeeper] ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - Используйте скрипт-генератор сообщений (Python) для эмуляции вызовов инференса.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Брокер сообщений | Apache Kafka (Confluent) | Промежуточное хранение и маршрутизация |
| Клиентская библиотека | confluent-kafka-python | Consumer/Producer с поддержкой idempotence |
| Инференс-сервис (mock) | Python FastAPI | Заглушка, возвращающая success/failure |
| Оркестрация | Docker & Docker Compose | Упрощённый запуск Kafka |
| Обработка | Python asyncio / threading | Асинхронный consumer с retry |
| Утилиты | curl, kcat (kafkacat) | Ручное тестирование топиков |
4. Этапы выполнения
Этап 1: Настройка Kafka-топиков и инфраструктуры (30 мин)
Действия
- Создайте топики с помощью kafka-topics.sh (или через Docker exec):
# input – основные сообщения инференса docker exec kafka kafka-topics --create --topic input --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 # retry-1s, retry-10s, retry-1m – очереди повторных попыток for delay in 1s 10s 1m; do docker exec kafka kafka-topics --create --topic "retry-${delay}" --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 done # dlq – мёртвые сообщения docker exec kafka kafka-topics --create --topic dlq --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config cleanup.policy=compact - Настройте consumer group
inference-groupдля input-топика (будет создана автоматически). - Проверьте доступность топиков:
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
Ожидаемый результат этапа 5 топиков созданы, consumer может подключаться.
Этап 2: Реализация consumer с retry-логикой (1.5 ч)
Действия
- Напишите класс
InferenceConsumerна Python:- Подписка на
input(основной поток) и на всеretry-*топики (отдельные потоки). - При получении сообщения вызывается mock-инференс (функция
process_message). - Если
process_messageвернулTrue– commit offset. - Если
Falseили исключение – определить, сколько retry уже было (хранить в заголовках Kafka).
- Подписка на
- Логика retry:
- В заголовках сообщения хранить количество попыток (
retry_count) и временную метку последней попытки. - При первой ошибке (retry_count=0) отправляем в
retry-1sс заголовкомscheduled_time = now + 1s. - При второй ошибке (retry_count=1) →
retry-10s. - При третьей (retry_count=2) →
retry-1m. - После четвёртой (retry_count>=3) → отправляем в dlq с заголовком
final_error.
- В заголовках сообщения хранить количество попыток (
- Реализуйте таймерную проверку retry-топиков: consumer с auto.offset.reset=earliest читает, но обрабатывает только те сообщения, чьё
scheduled_time<=now. Остальные пропускает (паузирует партицию или откладывает). - Пример кода обработчика:
def process_message(msg_value): # симуляция инференса: 80% успех, 20% ошибка import random return random.random() < 0.8 - Используйте Producer для отправки в retry/DLQ:
producer.produce( topic='retry-1s', value=msg.value(), headers=[('retry_count', str(1)), ('original_timestamp', str(msg.timestamp()[1]))], key=msg.key() )
Ожидаемый результат этапа Сообщения, вызвавшие ошибку, переносятся по цепочке retry-топиков, а после 3 неудач попадают в dlq.
Этап 3: Настройка DLQ и логирование (1 ч)
Действия
- Создайте consumer для dlq-топика, который записывает каждое упавшее сообщение в лог (JSON-формат) и в специальную таблицу (опционально – ClickHouse или простой CSV).
- Настройте retention для dlq: используйте cleanup.policy=compact, чтобы хранить последнюю версию каждого ключа.
- Добавьте метрики: количество сообщений в dlq, среднее число retry, процент успешных обработок. Можно экспортировать через Prometheus client.
- Проверьте, что dlq-сообщения содержат полную информацию: исходный payload, время падения, количество попыток, текст ошибки.
- Напишите простой скрипт
dlq_viewer.py, который выводит последние 10 сообщений из dlq.
Ожидаемый результат этапа Упавшие сообщения логируются и доступны для просмотра.
Этап 4: Реализация ручного reprocess (1.5 ч)
Действия
- Создайте REST API (или CLI) для reprocess сообщений из dlq обратно в input.
GET /dlq/list– возвращает список сообщений (ключи, время, ошибка).POST /dlq/reprocess– принимает список ключей (илиall) и отправляет соответствующие сообщения вinputс обнулёнными заголовками retry.
- Реализуйте consumer для dlq с ручным управлением offset:
- При запросе reprocess читаем сообщения из dlq по ключам (используя Kafka Streams или прямой consumer с assign).
- Отправляем копию в input через producer.
- Коммитим offset или удаляем сообщение из dlq (альтернатива – tombstone).
- CLI-версия (скрипт):
# reprocess.py --key my_msg_key consumer.assign([TopicPartition('dlq', 0)]) while True: msg = consumer.poll(1.0) if msg and msg.key() == key: producer.produce('input', value=msg.value(), key=msg.key()) consumer.commit() break - Добавьте idempotent producer, чтобы избежать дубликатов при повторной отправке.
- Протестируйте: отправьте сообщение в input, дождитесь его падения в dlq, затем выполните reprocess и убедитесь, что оно успешно обработалось.
Ожидаемый результат этапа Администратор может вручную вернуть любое сообщение из dlq в обработку.
Этап 5: Интеграционное тестирование (45 мин)
Действия
- Запустите все компоненты: Kafka, consumer, dlq-viewer, reprocess API.
- Сгенерируйте 100 сообщений с разными ключами (через
kafkacatили producer). - Наблюдайте:
- Сообщения с ошибками должны перейти в retry-1s, затем в retry-10s, retry-1m, dlq.
- Проверьте тайминги: сообщение в retry-1s должно быть обработано не ранее чем через 1 секунду.
- Выберите 2 ключа из dlq, выполните reprocess – они должны повторно появиться в input и успешно обработаться.
- Проверьте логи: все шаги задокументированы.
- Напишите
READMEс инструкцией по запуску.
Ожидаемый результат этапа Пайплайн работает согласно спецификации, все сценарии покрыты тестами.
5. Критерии приемки (Definition of Done)
- Созданы топики:
input,retry-1s,retry-10s,retry-1m,dlq. -
process_messageвызывается для каждого сообщения изinput. - При ошибке сообщение отправляется в
retry-1sс заголовкомretry_count=1. - После 3 неудачных попыток сообщение попадает в
dlq. - В
dlqхранятся исходный payload и метаданные (количество попыток, ошибка). - Реализован ручной reprocess (через REST или CLI) – сообщение возвращается в
inputс чистым счётчиком. - При повторном reprocess того же ключа не возникает дубликатов (idempotent producer/consumer).
- Все шаги задокументированы в README с примерами команд.
6. Ожидаемый результат
Основной артефакт Папка project/ с содержимым:
docker-compose.yml– Kafka и, опционально, consumer сервис.consumer.py– основной consumer с retry-логикой.dlq_viewer.py– утилита просмотра dlq.reprocess.py– скрипт ручного reprocess.README.md– инструкция по запуску и тестированию.- (Опционально)
api.py– FastAPI для reprocess.
Содержание: полностью рабочий прототип распределённого DLQ для инференса.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Сообщения в retry-топике обрабатываются моментально, а не с задержкой | Использовать scheduled_time в заголовках и пропускать (коммитить без обработки) сообщения, время которых ещё не наступило. Или использовать Kafka Streams suppress(). |
| При ручном reprocess теряется порядок сообщений | Не критично, т.к. reprocess – административная операция. Можно сохранять оригинальный timestamp в заголовках. |
| Consumer падает во время обработки retry | Настроить enable.auto.commit=false и коммитить только после успешной обработки. |
| Дубликаты при reprocess (если сообщение уже обрабатывается) | Использовать idempotent producer (enable.idempotence=true) и уникальный message_id в заголовках. |
| Потоковое потребление retry-топиков сложно синхронизировать | Использовать отдельные consumer group для каждого retry-топика, каждый со своим таймером. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| 1. Настройка Kafka-топиков | 30 мин |
| 2. Реализация consumer с retry | 1 ч 30 мин |
| 3. DLQ и логирование | 1 ч |
| 4. Ручной reprocess | 1 ч 30 мин |
| 5. Интеграционное тестирование | 45 мин |
| Итого | 5 ч 15 мин |
Для первого раза заложите дополнительно 2 часа на отладку и чтение документации.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Основы Apache Kafka: топики, партиции, consumer groups |
| 34 | Consumer offset management и auto.commit |
| 56 | Idempotent producer и exactly-once semantics |
| 78 | Dead Letter Queue: проектирование и best practices |
| 101 | Таймерные задержки в Kafka (delayed retry) |
| 203 | Обработка ошибок в event-driven системах |
| 305 | Мониторинг Kafka: метрики consumer lag, rate |
| 412 | Работа с заголовками (headers) в Kafka-сообщениях |
| 518 | Compacted topics и удаление устаревших сообщений |
| 623 | Паттерн Saga и компенсирующие транзакции |
10. Чек-лист самопроверки
- Я создал все необходимые топики и проверил их доступность.
- Мой consumer корректно отличает первую ошибку от повторных (по заголовку
retry_count). - Задержки между retry соответствуют заданным (1s, 10s, 1m) в пределах погрешности.
- Я протестировал ручной reprocess на сообщениях из dlq и убедился, что они обрабатываются повторно.
- В README описаны все команды для запуска и проверки пайплайна.