Как вы обрабатываете corrupted или empty документы в ingestion пайплайне?

Краткий тезис

Обработка corrupted (повреждённых) и empty (пустых) документов — обязательная часть production-grade ingestion pipeline (пайплайн загрузки данных). Без неё единичный битый PDF может остановить всю индексацию, а пустые документы — засорить векторную базу. Основные механизмы: Dead Letter Queue (DLQ) для изоляции ошибок, automatic retry с exponential backoff для временных сбоев, manual review через дашборд и alerting при превышении порога ошибок. Ключевая метрика — ingestion failure rate (доля неудачных загрузок).


1. Термины и контекст

  • Ingestion pipeline — конвейер, который извлекает текст из документов (PDF, DOCX, HTML), разбивает на чанки, генерирует эмбеддинги и сохраняет в векторную базу данных.
  • Corrupted document — файл, который не может быть корректно прочитан: битый PDF, запароленный документ, повреждённый архив, файл с неверной кодировкой.
  • Empty document — файл, который успешно открылся, но не содержит полезного текста: пустая страница, сканированное изображение без OCR, документ только с метаданными.
  • Dead Letter Queue (DLQ) — очередь «мёртвых писем», куда попадают сообщения (документы), которые не удалось обработать после всех попыток. Позволяет не терять данные и анализировать ошибки.
  • Exponential backoff — стратегия повторных попыток с увеличивающейся задержкой (1с, 2с, 4с, 8с…), чтобы не перегружать систему при временных сбоях.
  • Alerting — автоматическое уведомление команды (через Slack, PagerDuty) при превышении порога ошибок.

2. Почему это важно

  • Остановка пайплайна: если не обрабатывать ошибки, один corrupted документ может уронить весь ingestion (исключение в парсере → краш воркера).
  • Загрязнение данных: пустые документы, попавшие в векторную БД, создают «мёртвые» чанки, которые никогда не будут релевантны, но занимают место и увеличивают latency поиска.
  • Потеря данных: без DLQ ошибки просто логируются и забываются — документ теряется навсегда.
  • Доверие к системе: если ingestion регулярно «проглатывает» ошибки, пользователи получают неполные ответы.

3. Типы ошибок в ingestion

Тип ошибкиПримерПричинаДействие
Corrupted filePDF с повреждённой структуройОшибка при загрузке, битый файлDLQ, manual review
Password-protectedPDF с паролемОграничение доступаDLQ, уведомление владельца
Empty contentDOCX без текстаСканированная страница без OCRDLQ, возможно игнорировать
Network timeoutНедоступен S3 при скачиванииВременная проблема сетиRetry с exponential backoff
Parsing errorHTML с невалидной разметкойНестандартный форматDLQ, manual fix
Encoding errorТекст в UTF-16, ожидался UTF-8Несоответствие кодировокRetry с переключением кодировки

4. Dead Letter Queue (DLQ)

Концепция: каждый документ проходит через очередь сообщений (например, Kafka, RabbitMQ, AWS SQS). Если обработка завершилась ошибкой (после всех retry), документ отправляется в отдельный топик/очередь — DLQ.

Пример архитектуры с Kafka:

Source Topic (raw documents)
    ↓
Consumer (parser + chunker + embedder)
    ↓
Success → Index Topic → Vector DB
Failure → DLQ Topic → Manual Review Dashboard

Преимущества:

  • Документы не теряются.
  • Можно анализировать паттерны ошибок.
  • Легко перезапустить обработку после исправления.

Реализация на Python (kafka-python):

from kafka import KafkaProducer, KafkaConsumer
import json

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def send_to_dlq(doc_id, error, raw_data):
    message = {
        'doc_id': doc_id,
        'error': str(error),
        'raw_data': raw_data,
        'timestamp': datetime.utcnow().isoformat()
    }
    producer.send('ingestion_dlq', json.dumps(message).encode('utf-8'))

5. Automatic retry с exponential backoff

Когда retry: только для временных ошибок (network timeout, временная недоступность сервиса). Для постоянных (corrupted file) retry бесполезен.

Стратегия:

  • Максимум 3 попытки.
  • Задержка: 1с → 2с → 4с.
  • После каждой попытки логировать причину.

Пример на Python (tenacity):

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(requests.Timeout)
)
def download_document(url):
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    return response.content

Важно: не retry для ошибок парсинга — они сразу в DLQ.


6. Manual review и дашборд

Дашборд (Grafana, Streamlit, Metabase) показывает:

  • Количество документов в DLQ за последние 24 часа.
  • Распределение по типам ошибок (corrupted, empty, timeout).
  • Список failed документов с возможностью скачать оригинал.
  • Кнопка «Re-process» для повторной отправки в ingestion после ручного исправления.

Пример метрик на дашборде:

МетрикаОписаниеПорог
dlq_countТекущее количество в DLQ> 100 → внимание
ingestion_failure_rateДоля ошибок за последний час> 1% → alert
retry_success_rateДоля успешных retry< 50% → проблема

Процесс manual review:

  1. Оператор открывает дашборд.
  2. Смотрит на документы с типом «corrupted».
  3. Скачивает файл, проверяет вручную.
  4. Если файл действительно битый — удаляет из DLQ.
  5. Если ошибка ложная (например, неверный парсер) — исправляет код и нажимает «Re-process».

7. Alerting и мониторинг

Правила алертов:

  • ingestion_failure_rate > 1% за последний час → page on-call (PagerDuty).
  • dlq_count > 500Slack-уведомление в канал #ingestion-alerts.
  • retry_success_rate < 50% за 10 минут → эскалация.

Интеграция:

  • Prometheus собирает метрики из ingestion pipeline.
  • Alertmanager отправляет в PagerDuty/Slack.
  • В сообщении указывается: количество ошибок, примеры документов, ссылка на дашборд.

Пример метрики Prometheus:

from prometheus_client import Counter, Histogram

INGESTION_FAILURES = Counter('ingestion_failures_total', 'Total ingestion failures', ['error_type'])
INGESTION_SUCCESS = Counter('ingestion_success_total', 'Total successful ingestions')

8. Метрики качества ingestion

  • Ingestion failure rate = total_failures / total_documents (целевое < 1%).
  • Retry success rate = successful_retries / total_retries (целевое > 80%).
  • Time to recovery — среднее время от попадания в DLQ до повторной успешной обработки.
  • DLQ age — максимальный возраст документа в DLQ (если > 7 дней — нужно чистить).

Пример расчёта failure rate:

def compute_failure_rate(success_count, failure_count):
    total = success_count + failure_count
    if total == 0:
        return 0.0
    return failure_count / total

9. Лучшие практики

  • Валидация на входе: перед загрузкой проверять MIME-тип, размер файла, наличие пароля. Отклонять заведомо неподходящие документы.
  • Graceful degradation: если один документ упал, pipeline продолжает работу с остальными.
  • Логирование: каждое событие (успех, retry, DLQ) логировать с doc_id, временем, типом ошибки.
  • Автоматическая очистка DLQ: stale документы (старше 30 дней) удалять или архивировать.
  • Тестирование: unit-тесты на обработку corrupted PDF, integration-тесты на retry.

Пет-проект для закрепления

Задача: построить ingestion pipeline для PDF-документов с обработкой ошибок.

Инструменты: Python, Apache Kafka (или Redis Streams), PyMuPDF (fitz), FastAPI, Streamlit.

Шаги:

  1. Написать producer, который читает PDF из локальной папки и отправляет в Kafka-топик raw_docs.
  2. Написать consumer, который:
    • Пытается извлечь текст через fitz.open().
    • Если ошибка (corrupted) → отправляет в DLQ.
    • Если текст пустой (len(text) < 10) → отправляет в DLQ с пометкой empty.
    • Если успех → сохраняет в MongoDB и отправляет в топик indexed.
  3. Реализовать retry для временных ошибок (например, имитация network timeout через time.sleep).
  4. Сделать Streamlit-дашборд: таблица DLQ, кнопка «Re-process», график failure rate.
  5. Добавить Prometheus-метрики и простой alert (если failure rate > 5% за 5 минут → print в консоль).

Ожидаемый результат: рабочий пайплайн, который не падает при битых PDF, позволяет просматривать ошибки и повторно обрабатывать документы после исправления.


Связь с другими вопросами

ВопросТема
527Как вы проектируете ingestion pipeline для RAG?
529Как вы обрабатываете дубликаты документов?
530Как вы версионируете документы в векторной БД?
531Как вы обеспечиваете consistency между сырыми документами и индексами?
532Как вы тестируете ingestion pipeline?
533Какие метрики вы отслеживаете в production для ingestion?

Навигация