Как вы обрабатываете corrupted или empty документы в ingestion пайплайне?
Краткий тезис
Обработка corrupted (повреждённых) и empty (пустых) документов — обязательная часть production-grade ingestion pipeline (пайплайн загрузки данных). Без неё единичный битый PDF может остановить всю индексацию, а пустые документы — засорить векторную базу. Основные механизмы: Dead Letter Queue (DLQ) для изоляции ошибок, automatic retry с exponential backoff для временных сбоев, manual review через дашборд и alerting при превышении порога ошибок. Ключевая метрика — ingestion failure rate (доля неудачных загрузок).
1. Термины и контекст
- Ingestion pipeline — конвейер, который извлекает текст из документов (PDF, DOCX, HTML), разбивает на чанки, генерирует эмбеддинги и сохраняет в векторную базу данных.
- Corrupted document — файл, который не может быть корректно прочитан: битый PDF, запароленный документ, повреждённый архив, файл с неверной кодировкой.
- Empty document — файл, который успешно открылся, но не содержит полезного текста: пустая страница, сканированное изображение без OCR, документ только с метаданными.
- Dead Letter Queue (DLQ) — очередь «мёртвых писем», куда попадают сообщения (документы), которые не удалось обработать после всех попыток. Позволяет не терять данные и анализировать ошибки.
- Exponential backoff — стратегия повторных попыток с увеличивающейся задержкой (1с, 2с, 4с, 8с…), чтобы не перегружать систему при временных сбоях.
- Alerting — автоматическое уведомление команды (через Slack, PagerDuty) при превышении порога ошибок.
2. Почему это важно
- Остановка пайплайна: если не обрабатывать ошибки, один corrupted документ может уронить весь ingestion (исключение в парсере → краш воркера).
- Загрязнение данных: пустые документы, попавшие в векторную БД, создают «мёртвые» чанки, которые никогда не будут релевантны, но занимают место и увеличивают latency поиска.
- Потеря данных: без DLQ ошибки просто логируются и забываются — документ теряется навсегда.
- Доверие к системе: если ingestion регулярно «проглатывает» ошибки, пользователи получают неполные ответы.
3. Типы ошибок в ingestion
| Тип ошибки | Пример | Причина | Действие |
|---|---|---|---|
| Corrupted file | PDF с повреждённой структурой | Ошибка при загрузке, битый файл | DLQ, manual review |
| Password-protected | PDF с паролем | Ограничение доступа | DLQ, уведомление владельца |
| Empty content | DOCX без текста | Сканированная страница без OCR | DLQ, возможно игнорировать |
| Network timeout | Недоступен S3 при скачивании | Временная проблема сети | Retry с exponential backoff |
| Parsing error | HTML с невалидной разметкой | Нестандартный формат | DLQ, manual fix |
| Encoding error | Текст в UTF-16, ожидался UTF-8 | Несоответствие кодировок | Retry с переключением кодировки |
4. Dead Letter Queue (DLQ)
Концепция: каждый документ проходит через очередь сообщений (например, Kafka, RabbitMQ, AWS SQS). Если обработка завершилась ошибкой (после всех retry), документ отправляется в отдельный топик/очередь — DLQ.
Пример архитектуры с Kafka:
Source Topic (raw documents)
↓
Consumer (parser + chunker + embedder)
↓
Success → Index Topic → Vector DB
Failure → DLQ Topic → Manual Review Dashboard
Преимущества:
- Документы не теряются.
- Можно анализировать паттерны ошибок.
- Легко перезапустить обработку после исправления.
Реализация на Python (kafka-python):
from kafka import KafkaProducer, KafkaConsumer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092')
def send_to_dlq(doc_id, error, raw_data):
message = {
'doc_id': doc_id,
'error': str(error),
'raw_data': raw_data,
'timestamp': datetime.utcnow().isoformat()
}
producer.send('ingestion_dlq', json.dumps(message).encode('utf-8'))
5. Automatic retry с exponential backoff
Когда retry: только для временных ошибок (network timeout, временная недоступность сервиса). Для постоянных (corrupted file) retry бесполезен.
Стратегия:
- Максимум 3 попытки.
- Задержка: 1с → 2с → 4с.
- После каждой попытки логировать причину.
Пример на Python (tenacity):
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(requests.Timeout)
)
def download_document(url):
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.content
Важно: не retry для ошибок парсинга — они сразу в DLQ.
6. Manual review и дашборд
Дашборд (Grafana, Streamlit, Metabase) показывает:
- Количество документов в DLQ за последние 24 часа.
- Распределение по типам ошибок (corrupted, empty, timeout).
- Список failed документов с возможностью скачать оригинал.
- Кнопка «Re-process» для повторной отправки в ingestion после ручного исправления.
Пример метрик на дашборде:
| Метрика | Описание | Порог |
|---|---|---|
dlq_count | Текущее количество в DLQ | > 100 → внимание |
ingestion_failure_rate | Доля ошибок за последний час | > 1% → alert |
retry_success_rate | Доля успешных retry | < 50% → проблема |
Процесс manual review:
- Оператор открывает дашборд.
- Смотрит на документы с типом «corrupted».
- Скачивает файл, проверяет вручную.
- Если файл действительно битый — удаляет из DLQ.
- Если ошибка ложная (например, неверный парсер) — исправляет код и нажимает «Re-process».
7. Alerting и мониторинг
Правила алертов:
ingestion_failure_rate > 1%за последний час → page on-call (PagerDuty).dlq_count > 500→ Slack-уведомление в канал #ingestion-alerts.retry_success_rate < 50%за 10 минут → эскалация.
- Prometheus собирает метрики из ingestion pipeline.
- Alertmanager отправляет в PagerDuty/Slack.
- В сообщении указывается: количество ошибок, примеры документов, ссылка на дашборд.
Пример метрики Prometheus:
from prometheus_client import Counter, Histogram
INGESTION_FAILURES = Counter('ingestion_failures_total', 'Total ingestion failures', ['error_type'])
INGESTION_SUCCESS = Counter('ingestion_success_total', 'Total successful ingestions')
8. Метрики качества ingestion
- Ingestion failure rate =
total_failures / total_documents(целевое < 1%). - Retry success rate =
successful_retries / total_retries(целевое > 80%). - Time to recovery — среднее время от попадания в DLQ до повторной успешной обработки.
- DLQ age — максимальный возраст документа в DLQ (если > 7 дней — нужно чистить).
Пример расчёта failure rate:
def compute_failure_rate(success_count, failure_count):
total = success_count + failure_count
if total == 0:
return 0.0
return failure_count / total
9. Лучшие практики
- Валидация на входе: перед загрузкой проверять MIME-тип, размер файла, наличие пароля. Отклонять заведомо неподходящие документы.
- Graceful degradation: если один документ упал, pipeline продолжает работу с остальными.
- Логирование: каждое событие (успех, retry, DLQ) логировать с doc_id, временем, типом ошибки.
- Автоматическая очистка DLQ: stale документы (старше 30 дней) удалять или архивировать.
- Тестирование: unit-тесты на обработку corrupted PDF, integration-тесты на retry.
Пет-проект для закрепления
Задача: построить ingestion pipeline для PDF-документов с обработкой ошибок.
Инструменты: Python, Apache Kafka (или Redis Streams), PyMuPDF (fitz), FastAPI, Streamlit.
Шаги:
- Написать producer, который читает PDF из локальной папки и отправляет в Kafka-топик
raw_docs. - Написать consumer, который:
- Реализовать retry для временных ошибок (например, имитация network timeout через
time.sleep). - Сделать Streamlit-дашборд: таблица DLQ, кнопка «Re-process», график failure rate.
- Добавить Prometheus-метрики и простой alert (если failure rate > 5% за 5 минут → print в консоль).
Ожидаемый результат: рабочий пайплайн, который не падает при битых PDF, позволяет просматривать ошибки и повторно обрабатывать документы после исправления.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 527 | Как вы проектируете ingestion pipeline для RAG? |
| 529 | Как вы обрабатываете дубликаты документов? |
| 530 | Как вы версионируете документы в векторной БД? |
| 531 | Как вы обеспечиваете consistency между сырыми документами и индексами? |
| 532 | Как вы тестируете ingestion pipeline? |
| 533 | Какие метрики вы отслеживаете в production для ingestion? |
Навигация
- Предыдущий: 527
- Следующий: 529
- Индекс: 00. Индекс разборов