English translation is not available yet. Showing Russian content.

Как вы обрабатываете corrupted или empty документы в ingestion пайплайне?

Краткий тезис

Corrupted (повреждённые) и empty (пустые) документы — неизбежная реальность в production-пайплайнах инжестии данных. Их обработка строится на трёх уровнях: детекция (проверка формата, размера, схемы, эмбеддингов), стратегия обработки (skip, retry, Dead Letter Queue с ручным ревью) и мониторинг (метрика ingestion_error_rate, алерт при превышении 1% ошибок). Правильная обработка таких документов предотвращает деградацию RAG-системы и экономит время на отладку.


1. Термины: corrupted и empty документы

  • Corrupted документ — файл, который не соответствует ожидаемой структуре, повреждён при передаче, имеет невалидный контент (бинарные данные вместо текста, сломанная кодировка, неполный JSON/PDF).
  • Empty документ — файл с нулевым размером (0 байт) или содержащий только пробелы/пустые строки. В контексте RAG такой документ не даёт полезной информации для индексации.
  • Ingestion pipeline — конвейер, который загружает документы из источника (S3, базы данных, API), парсит, чанкует, эмбеддит и сохраняет в векторную БД. Ошибки на любом этапе могут приводить к появлению corrupted/empty записей.

2. Почему это критично для Agentic RAG

В Agentic RAG агенты самостоятельно принимают решения о поиске и генерации. Если в индексе оказывается corrupted документ (например, мусорные строки вместо текста), ретривер может вернуть его как релевантный, и LLM сгенерирует бессмысленный или галлюцинированный ответ. Empty документы обычно игнорируются, но тратят ресурсы на эмбеддинг и хранение. Кроме того, систематические ошибки инжестии маскируют проблемы качества данных — их нужно выявлять на ранних этапах.


3. Детекция corrupted / empty документов

Детекция должна происходить на нескольких этапах пайплайна:

ЭтапЧто проверяемИнструменты
Получение файлаРазмер (0 байт?), хеш-сумма (corrupted при передаче)Проверка size > 0, MD5/SHA256
ПарсингСтруктура документа (JSON schema, PDF corruption, HTML)json.loads() / PyMuPDF / BeautifulSoup
Извлечение текстаПустой текст после извлечения, нечитаемые символыlen(text.strip()) == 0, regex для кодировки
ЧанкингКоличество чанков (0 — ошибка)Проверка длины списка чанков
ЭмбеддингЭмбеддинг-вектор — все нули или NaN?Проверка нормы вектора

Пример простого валидатора на Python:

def validate_document(doc: dict) -> bool:
    if doc.get("size", 0) == 0:
        return False   # empty
    text = doc.get("content", "")
    if not text or len(text.strip()) == 0:
        return False   # effectively empty
    # Check for binary garbage (heuristic)
    if sum(1 for c in text if ord(c) > 127) / max(len(text), 1) > 0.5:
        return False   # likely corrupted
    return True

4. Стратегии обработки: Skip, Retry, Dead Letter Queue

После детекции необходимо решить, что делать с плохим документом. Три основные стратегии:

4.1 Skip (пропуск)

Наиболее простая стратегия: документ не добавляется в индекс, ошибка логируется. Подходит для несущественных данных (например, новостные ленты, где потеря одной статьи некритична).

Минус: теряется информация о том, что документ был, — позже сложно выявить систематическую проблему в источнике.

4.2 Retry (повтор)

Если ошибка могла быть временной (перегрузка сети, таймаут парсера), стоит попробовать обработать документ повторно. Обычно делают 2–3 попытки с экспоненциальной задержкой.

for attempt in range(3):
    try:
        ingest_document(doc)
        break
    except TemporaryError:
        if attempt == 2:
            send_to_dlq(doc)   # после 3 попыток — в DLQ
        time.sleep(2 ** attempt)

4.3 Dead Letter Queue (DLQ) — очередь недоставленных сообщений

Это наиболее robust подход, особенно в event-driven архитектурах (Kafka, RabbitMQ). Повреждённые документы отправляются в отдельный топик/очередь, где они:

  • Ждут manual review (человек анализирует и решает: исправить, переслать или удалить);
  • Могут быть автоматически переобработаны после фикса бага;
  • Позволяют вести аудит всех ошибок инжестии.

Пример интеграции с Kafka

[Source Topic] → [Ingestion Consumer] → [DLQ Topic] (если ошибка)
                                     ↓
                              [Post-Processing] → [Vector DB]

Producer при ошибке пишет оригинальное сообщение с ошибкой в DLQ-топик:

if error:
    kafka_producer.send("ingestion-dlq", value=doc_with_error_metadata)
else:
    kafka_producer.send("ingestion-processed", value=processed_doc)

4.4 Manual review

Для DLQ нужен инструмент визуализации (например, UI вKafkaе: Kafdrop, Control Center|Confluent Control Center]]) или дашборд в Grafana, откуда оператор может просмотреть документы, скачать их и понять причину повреждения. Типичные действия: исправить сырой файл в источнике, добавить правило валидации, переслать документ вручную.


5. Метрики и алерты

Ключевая метрика — ingestion_error_rate (доля документов, признанных corrupted/empty, от общего числа обработанных). Порог — 1% (значение из черновика). При превышении — алерт (через PagerDuty, Slack, email).

Дополнительные метрики:

  • dlq_count — количество документов в DLQ (желательно держать 0 или стабильно низким);
  • retry_attempts_per_doc — среднее число попыток;
  • validation_fail_reason_distribution — распределение причин (empty, corrupt format, encoding error, schema mismatch).

Настройка алерта (пример Prometheus rule):

groups:
  - name: ingestion
    rules:
      - alert: HighIngestionErrorRate
        expr: rate(ingestion_errors_total[5m]) / rate(ingestion_attempts_total[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Ingestion error rate > 1% in last 5 minutes"

6. Автоматический retry после фикса

Когда проблема с источником исправлена (например, обновлён парсер, перезаписан повреждённый файл), необходимо повторно обработать документы из DLQ. Это можно сделать через:

  • Ручной триггер: оператор выбирает записи в DLQ и нажимает "Re-process".
  • Автоматический consumer: DLQ-топик периодически проверяется; если для документа прошло N часов, consumer пытается обработать его снова (с новыми правилами валидации).

Важно не создавать бесконечный цикл — после 3-х retry документ помечается как "dead true" и остаётся в DLQ для ручного разбора.


7. Best practices

ПрактикаОписание
Валидация схемыИспользуйте JSON Schema или Avro для проверки обязательных полей и типов на этапе сериализации.
Логирование контекстаВ логах сохраняйте не только факт ошибки, но и идентификатор документа, источник, этап пайплайна.
Мониторинг дрейфаВнезапный рост ошибок может означать проблему upstream (например, изменился формат файла).
Тестовые документыВключайте в пайплайн синтетические corrupted/empty документы для проверки корректности обработки.
ИдемпотентностьПовторная обработка одного и того же документа не должна дублировать данные в индексе.

8. Пет-проект для закрепления

Задача: Реализовать ingestion pipeline для документов (PDF, TXT, JSON), который:

  • детектирует и классифицирует corrupted/empty файлы;
  • использует Kafka DLQ;
  • имеет дашборд с метриками и алерт при ошибках >1%.

Инструменты: Python, Kafka (с помощью confluent_kafka), MinIO (S3-совместимое хранилище), Prometheus + Grafana.

Шаги:

  1. Написать producer, который читает файлы из локальной папки и отправляет в топик raw-docs.
  2. Consumer выполняет валидацию (проверка размера, текста, эмбеддинга). При ошибке пишет в dlq-docs, при успехе — в processed-docs.
  3. Реализовать встроенный экспорт метрик в Prometheus: ingestion_attempts_total, ingestion_errors_total (label: reason).
  4. Настроить Grafana дашборд с трендами ошибок и значением ingestion_error_rate.
  5. Создать скрипт, который периодически читает DLQ-топик и пытается повторно обработать записи (с дополнительной валидацией).

Ожидаемый результат: Работающий pipeline, в котором все ошибки инжестии фиксируются, не теряются и легко анализируются. Можно продемонстрировать, как внесение битого PDF вызывает его попадание в DLQ, а после исправления парсера и повторной отправки — успешную индексацию.


9. Связь с другими вопросами

ВопросТема
272Как вы обрабатываете ошибки в Agentic RAG? (пересечение стратегий retry/DLQ)
274Как организовать retry в пайплайне? (автоматические повторные попытки)
275Как вы мониторите качество данных в RAG? (метрики, алерты)
276Как детектить data drift в документах? (изменение формата → рост ошибок)
5Как оцениваете качество retrieval? (impact corrupted документов на метрики)

10. Навигация


Навигация