Реализовать dead letter queue для сообщений

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать dead letter queue для сообщений

1. Цель задачи

Реализовать механизм dead letter queue (DLQ) для агентной системы обмена сообщениями. Сообщение, которое не удалось обработать после 3 повторных попыток (re-триев), должно быть перемещено в DLQ, а не потеряно или бесконечно зависнуть в основной очереди. Ключевой результат Проваленные сообщения не теряются, они сохраняются в DLQ с полными метаданными (тело сообщения, причина сбоя, количество попыток) и доступны для последующего анализа и повторной обработки.

2. Исходные данные

Что нужноОткуда взять
Брокер сообщений (RabbitMQ / Kafka / Redis Streams)Установка через Docker или облачный сервис
Агенты-производители / агенты-потребителиСобственный код на Python (на базе фреймворка, например, Celery или asyncio)
Логи работы очередейStdout / файл / ELK
Тестовые сообщения с контролируемыми ошибкамиГенерация скриптом (например, сообщения с полем fail_after или случайным исключением)

Если нет реального инструмента — симулируем:

  1. Разворачиваем Redis локально (docker run -p 6379:6379 redis) или используем эмуляцию in-memory очереди через Python asyncio.Queue + хранилище для DLQ.
  2. Написываем простой консьюмер, который обрабатывает сообщения и имитирует сбой (например, бросает исключение с вероятностью 70%).
  3. Включаем счётчик попыток в метаданные сообщения и логику повторной отправки в основную очередь.
  4. После 3-х неудач перемещаем сообщение в отдельный список (DLQ) в Redis или in-memory.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Реализация логики очередей и агентов
Брокер сообщенийRabbitMQ (докер-контейнер) / Redis StreamsОсновная очередь + DLQ
Клиент для брокераpika (RabbitMQ), redis-py (Redis)Взаимодействие с очередью
Управление зависимостямиpip / poetryУстановка библиотек
Тестированиеpytest + pytest-asyncioМодульные и интеграционные тесты
Логированиеlogging (Python) или structlogФиксация каждого ретрия и перемещения в DLQ
Docker / Docker ComposeПоднятие брокера (опционально)

4. Этапы выполнения

Этап 1: Проектирование и подготовка окружения (30 мин)

Действия

  1. Выбрать брокер (рекомендуется Redis как самый простой для локального запуска).
  2. Спроектировать структуру сообщения:
    {
        "id": "uuid",
        "payload": {...},
        "retry_count": 0,
        "max_retries": 3,
        "error_history": [],
        "created_at": "iso_timestamp"
    }
    
  3. Определить структуру DLQ: отдельный список / топик с тем же форматом, но с полем moved_to_dlq_at и итоговой ошибкой.
  4. Настроить Docker Compose с Redis (или RabbitMQ).
  5. Создать шаблон проекта: main.py, producer.py, consumer.py, dlq_manager.py, tests/.

Ожидаемый результат этапа Рабочее окружение + описание формата сообщения.

Этап 2: Реализация базовой очереди и потребителя (1 час)

Действия

  1. Написать продюсера, который помещает сообщения в очередь (Redis List или Stream).
  2. Написать консьюмера, который читает сообщения блокирующим способом (BLPOP / XREADGROUP).
  3. Добавить в консьюмера обработчик ошибок: если при обработке выброшено исключение, сообщение должно быть повторно поставлено в основную очередь.
  4. Реализовать логику повторной отправки с инкрементом retry_count.
  5. Убедиться, что без лимита ретриев сообщение циклится бесконечно.

Ожидаемый результат этапа Потребитель обрабатывает сообщения и при ошибке повторно ставит их в очередь (без ограничения).

Этап 3: Внедрение лимита ретриев и перемещение в DLQ (1.5 часа)

Действия

  1. Изменить логику повторной отправки: проверять retry_count < max_retries.
  2. Если retry_count >= max_retries — отправлять сообщение в DLQ (отдельный список Redis: dlq:messages).
  3. В DLQ сохранять исходное сообщение вместе с последней ошибкой и временем перемещения.
  4. Реализовать функцию dlq_manager.peek() для чтения всех сообщений из DLQ (без удаления).
  5. Добавить логирование на каждый шаг:
    logger.info("Moved message to DLQ", message_id=msg["id"], reason=str(e))
    

Ожидаемый результат этапа После 3 неудач сообщение исчезает из основной очереди и появляется в DLQ.

Этап 4: Тестирование и проверка граничных случаев (1 час)

Действия

  1. Написать unit-тесты для логики повторных попыток и перемещения (без реального брокера, мокая очередь).
  2. Написать интеграционный тест (с Redis в Docker):
    • Отправить 10 сообщений, из которых 4 гарантированно упадут (специальное поле will_fail: true).
    • Проверить, что после 3 ретриев эти 4 сообщения оказались в DLQ, остальные 6 успешно обработаны.
  3. Проверить, что DLQ не теряет сообщения при перезапуске консьюмера (persistence).
  4. Проверить, что сообщения в DLQ можно повторно обработать (опционально — реализовать dlq_requeue).
  5. Проверить конкурентный доступ: запустить 2 консьюмера одновременно, убедиться, что ни одно сообщение не дублируется в DLQ.

Ожидаемый результат этапа Все тесты проходят, сообщения корректно попадают в DLQ.

Этап 5: Документирование и финальная упаковка (30 мин)

Действия

  1. Описать в README.md: архитектуру, как запустить, как проверить DLQ.
  2. Добавить примеры логов и скриншоты работы (опционально).
  3. Оформить код в соответствии с PEP8, добавить type hints.
  4. Создать Makefile для быстрых команд: make run, make test, make clean.

Ожидаемый результат этапа Готовый репозиторий с кодом, тестами и документацией.

5. Критерии приемки (Definition of Done)

  • Сообщение после третьей неудачной попытки обработки перемещается в DLQ.
  • DLQ хранит полное исходное сообщение, включая все поля и историю ошибок.
  • DLQ не блокирует основную очередь (сообщение удаляется из основной очереди).
  • Реализован метод просмотра содержимого DLQ без удаления.
  • Все шаги логируются (ретрии, перемещение в DLQ).
  • Написаны минимум 2 unit-теста и 1 интеграционный тест.
  • Код покрыт type hints и docstrings.
  • README содержит инструкцию по запуску и проверке DLQ.
  • При перезапуске консьюмера уже перемещённые в DLQ сообщения не теряются.
  • Система корректно работает при параллельных потребителях (нет гонок).

6. Ожидаемый результат

Основной артефакт — папка проекта со следующей структурой:

dlq-demo/
├── README.md
├── docker-compose.yml (если используется брокер)
├── src/
│   ├── __init__.py
│   ├── producer.py
│   ├── consumer.py
│   ├── dlq_manager.py
│   └── models.py
├── tests/
│   ├── test_retry_logic.py
│   └── test_integration.py
└── requirements.txt

Файл dlq_manager.py содержит класс DLQManager с методами:

  • push(message: dict)
  • peek_all() -> list[dict]
  • pop(message_id: str) -> dict (опционально для повторной обработки)

Дополнительно: скрипт demo.py, который запускает продюсера и консьюмера в одном процессе для быстрой демонстрации.

7. Возможные сложности и их решение

СложностьРешение
Сообщение может быть обработано частично (сбой на середине)Использовать транзакции брокера или паттерн outbox; в простой реализации — считать неудачей любое исключение консьюмера
Дублирование сообщений при ретрие (at-least-once vs exactly-once)Принять at-least-once; добавить idempotency key в продюсер; в тестах проверять, что DLQ не содержит дубликатов с одинаковым id
Конкурентный доступ к счётчику retry_countИспользовать атомарные операции Redis (HINCRBY) или блокировки на уровне consumer group
Бесконечный цикл, если лимит не проверяетсяДобавить явную проверку if msg["retry_count"] >= max_retries: move_to_dlq()
DLQ растёт бесконечноПредусмотреть TTL для сообщений в DLQ или механизм архивации; в рамках задачи — просто документировать

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Проектирование и подготовка окружения30 мин
Этап 2: Реализация базовой очереди и потребителя1 час
Этап 3: Внедрение лимита ретриев и DLQ1.5 часа
Этап 4: Тестирование и проверка граничных случаев1 час
Этап 5: Документирование и финальная упаковка30 мин
Итого4.5 часа

Примечание: Для первого выполнения (незнакомый брокер) добавьте 1 час на изучение документации.

9. Связанные вопросы из базы знаний

ВопросТема
41Базовые паттерны очередей сообщений
89Стратегии повторных попыток (retry)
127Dead Letter Queue: назначение и реализация
256Гарантии доставки (at-least-once, exactly-once)
320Обработка ошибок в распределённых системах
418Мониторинг и алертинг очередей
533Тестирование асинхронных систем
647Конкурентный доступ к очередям
755Идемпотентность потребителей
871Логирование в message-driven системах

10. Чек-лист самопроверки

  • Я убедился, что после 3 ретриев сообщение действительно оказывается в DLQ (проверил через dlq_manager.peek()).
  • Я проверил, что сообщения в DLQ содержат все поля, включая error_history и moved_to_dlq_at.
  • Я написал интеграционный тест, который отправляет заведомо ошибочное сообщение и подтверждает его наличие в DLQ.
  • Я проверил, что при перезапуске консьюмера DLQ не теряет уже перемещённые сообщения.
  • Я задокументировал в README, как запустить демо и как просмотреть содержимое DLQ.