English translation is not available yet. Showing Russian content.
Реализовать dead letter queue для сообщений
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать dead letter queue для сообщений
1. Цель задачи
Реализовать механизм dead letter queue (DLQ) для агентной системы обмена сообщениями. Сообщение, которое не удалось обработать после 3 повторных попыток (re-триев), должно быть перемещено в DLQ, а не потеряно или бесконечно зависнуть в основной очереди. Ключевой результат Проваленные сообщения не теряются, они сохраняются в DLQ с полными метаданными (тело сообщения, причина сбоя, количество попыток) и доступны для последующего анализа и повторной обработки.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Брокер сообщений (RabbitMQ / Kafka / Redis Streams) | Установка через Docker или облачный сервис |
| Агенты-производители / агенты-потребители | Собственный код на Python (на базе фреймворка, например, Celery или asyncio) |
| Логи работы очередей | Stdout / файл / ELK |
| Тестовые сообщения с контролируемыми ошибками | Генерация скриптом (например, сообщения с полем fail_after или случайным исключением) |
Если нет реального инструмента — симулируем:
- Разворачиваем Redis локально (docker run -p 6379:6379 redis) или используем эмуляцию in-memory очереди через Python asyncio.Queue + хранилище для DLQ.
- Написываем простой консьюмер, который обрабатывает сообщения и имитирует сбой (например, бросает исключение с вероятностью 70%).
- Включаем счётчик попыток в метаданные сообщения и логику повторной отправки в основную очередь.
- После 3-х неудач перемещаем сообщение в отдельный список (DLQ) в Redis или in-memory.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.10+ | Реализация логики очередей и агентов |
| Брокер сообщений | RabbitMQ (докер-контейнер) / Redis Streams | Основная очередь + DLQ |
| Клиент для брокера | pika (RabbitMQ), redis-py (Redis) | Взаимодействие с очередью |
| Управление зависимостями | pip / poetry | Установка библиотек |
| Тестирование | pytest + pytest-asyncio | Модульные и интеграционные тесты |
| Логирование | logging (Python) или structlog | Фиксация каждого ретрия и перемещения в DLQ |
| Docker / Docker Compose | — | Поднятие брокера (опционально) |
4. Этапы выполнения
Этап 1: Проектирование и подготовка окружения (30 мин)
Действия
- Выбрать брокер (рекомендуется Redis как самый простой для локального запуска).
- Спроектировать структуру сообщения:
{ "id": "uuid", "payload": {...}, "retry_count": 0, "max_retries": 3, "error_history": [], "created_at": "iso_timestamp" } - Определить структуру DLQ: отдельный список / топик с тем же форматом, но с полем
moved_to_dlq_atи итоговой ошибкой. - Настроить Docker Compose с Redis (или RabbitMQ).
- Создать шаблон проекта:
main.py, producer.py, consumer.py,dlq_manager.py,tests/.
Ожидаемый результат этапа Рабочее окружение + описание формата сообщения.
Этап 2: Реализация базовой очереди и потребителя (1 час)
Действия
- Написать продюсера, который помещает сообщения в очередь (Redis List или Stream).
- Написать консьюмера, который читает сообщения блокирующим способом (BLPOP / XREADGROUP).
- Добавить в консьюмера обработчик ошибок: если при обработке выброшено исключение, сообщение должно быть повторно поставлено в основную очередь.
- Реализовать логику повторной отправки с инкрементом
retry_count. - Убедиться, что без лимита ретриев сообщение циклится бесконечно.
Ожидаемый результат этапа Потребитель обрабатывает сообщения и при ошибке повторно ставит их в очередь (без ограничения).
Этап 3: Внедрение лимита ретриев и перемещение в DLQ (1.5 часа)
Действия
- Изменить логику повторной отправки: проверять retry_count < max_retries.
- Если retry_count >= max_retries — отправлять сообщение в DLQ (отдельный список Redis: dlq:messages).
- В DLQ сохранять исходное сообщение вместе с последней ошибкой и временем перемещения.
- Реализовать функцию
dlq_manager.peek()для чтения всех сообщений из DLQ (без удаления). - Добавить логирование на каждый шаг:
logger.info("Moved message to DLQ", message_id=msg["id"], reason=str(e))
Ожидаемый результат этапа После 3 неудач сообщение исчезает из основной очереди и появляется в DLQ.
Этап 4: Тестирование и проверка граничных случаев (1 час)
Действия
- Написать unit-тесты для логики повторных попыток и перемещения (без реального брокера, мокая очередь).
- Написать интеграционный тест (с Redis в Docker):
- Отправить 10 сообщений, из которых 4 гарантированно упадут (специальное поле
will_fail: true). - Проверить, что после 3 ретриев эти 4 сообщения оказались в DLQ, остальные 6 успешно обработаны.
- Отправить 10 сообщений, из которых 4 гарантированно упадут (специальное поле
- Проверить, что DLQ не теряет сообщения при перезапуске консьюмера (persistence).
- Проверить, что сообщения в DLQ можно повторно обработать (опционально — реализовать
dlq_requeue). - Проверить конкурентный доступ: запустить 2 консьюмера одновременно, убедиться, что ни одно сообщение не дублируется в DLQ.
Ожидаемый результат этапа Все тесты проходят, сообщения корректно попадают в DLQ.
Этап 5: Документирование и финальная упаковка (30 мин)
Действия
- Описать в README.md: архитектуру, как запустить, как проверить DLQ.
- Добавить примеры логов и скриншоты работы (опционально).
- Оформить код в соответствии с PEP8, добавить type hints.
- Создать Makefile для быстрых команд: make run, make test, make clean.
Ожидаемый результат этапа Готовый репозиторий с кодом, тестами и документацией.
5. Критерии приемки (Definition of Done)
- Сообщение после третьей неудачной попытки обработки перемещается в DLQ.
- DLQ хранит полное исходное сообщение, включая все поля и историю ошибок.
- DLQ не блокирует основную очередь (сообщение удаляется из основной очереди).
- Реализован метод просмотра содержимого DLQ без удаления.
- Все шаги логируются (ретрии, перемещение в DLQ).
- Написаны минимум 2 unit-теста и 1 интеграционный тест.
- Код покрыт type hints и docstrings.
- README содержит инструкцию по запуску и проверке DLQ.
- При перезапуске консьюмера уже перемещённые в DLQ сообщения не теряются.
- Система корректно работает при параллельных потребителях (нет гонок).
6. Ожидаемый результат
Основной артефакт — папка проекта со следующей структурой:
dlq-demo/
├── README.md
├── docker-compose.yml (если используется брокер)
├── src/
│ ├── __init__.py
│ ├── producer.py
│ ├── consumer.py
│ ├── dlq_manager.py
│ └── models.py
├── tests/
│ ├── test_retry_logic.py
│ └── test_integration.py
└── requirements.txt
Файл dlq_manager.py содержит класс DLQManager с методами:
push(message: dict)peek_all() -> list[dict]pop(message_id: str) -> dict(опционально для повторной обработки)
Дополнительно: скрипт demo.py, который запускает продюсера и консьюмера в одном процессе для быстрой демонстрации.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Сообщение может быть обработано частично (сбой на середине) | Использовать транзакции брокера или паттерн outbox; в простой реализации — считать неудачей любое исключение консьюмера |
| Дублирование сообщений при ретрие (at-least-once vs exactly-once) | Принять at-least-once; добавить idempotency key в продюсер; в тестах проверять, что DLQ не содержит дубликатов с одинаковым id |
| Конкурентный доступ к счётчику retry_count | Использовать атомарные операции Redis (HINCRBY) или блокировки на уровне consumer group |
| Бесконечный цикл, если лимит не проверяется | Добавить явную проверку if msg["retry_count"] >= max_retries: move_to_dlq() |
| DLQ растёт бесконечно | Предусмотреть TTL для сообщений в DLQ или механизм архивации; в рамках задачи — просто документировать |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Проектирование и подготовка окружения | 30 мин |
| Этап 2: Реализация базовой очереди и потребителя | 1 час |
| Этап 3: Внедрение лимита ретриев и DLQ | 1.5 часа |
| Этап 4: Тестирование и проверка граничных случаев | 1 час |
| Этап 5: Документирование и финальная упаковка | 30 мин |
| Итого | 4.5 часа |
Примечание: Для первого выполнения (незнакомый брокер) добавьте 1 час на изучение документации.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 41 | Базовые паттерны очередей сообщений |
| 89 | Стратегии повторных попыток (retry) |
| 127 | Dead Letter Queue: назначение и реализация |
| 256 | Гарантии доставки (at-least-once, exactly-once) |
| 320 | Обработка ошибок в распределённых системах |
| 418 | Мониторинг и алертинг очередей |
| 533 | Тестирование асинхронных систем |
| 647 | Конкурентный доступ к очередям |
| 755 | Идемпотентность потребителей |
| 871 | Логирование в message-driven системах |
10. Чек-лист самопроверки
- Я убедился, что после 3 ретриев сообщение действительно оказывается в DLQ (проверил через
dlq_manager.peek()). - Я проверил, что сообщения в DLQ содержат все поля, включая
error_historyиmoved_to_dlq_at. - Я написал интеграционный тест, который отправляет заведомо ошибочное сообщение и подтверждает его наличие в DLQ.
- Я проверил, что при перезапуске консьюмера DLQ не теряет уже перемещённые сообщения.
- Я задокументировал в README, как запустить демо и как просмотреть содержимое DLQ.