English translation is not available yet. Showing Russian content.
Как вы обрабатываете corrupted или empty документы в ingestion пайплайне?
Краткий тезис
Corrupted (повреждённые) и empty (пустые) документы — неизбежная реальность в production-пайплайнах инжестии данных. Их обработка строится на трёх уровнях: детекция (проверка формата, размера, схемы, эмбеддингов), стратегия обработки (skip, retry, Dead Letter Queue с ручным ревью) и мониторинг (метрика ingestion_error_rate, алерт при превышении 1% ошибок). Правильная обработка таких документов предотвращает деградацию RAG-системы и экономит время на отладку.
1. Термины: corrupted и empty документы
- Corrupted документ — файл, который не соответствует ожидаемой структуре, повреждён при передаче, имеет невалидный контент (бинарные данные вместо текста, сломанная кодировка, неполный JSON/PDF).
- Empty документ — файл с нулевым размером (0 байт) или содержащий только пробелы/пустые строки. В контексте RAG такой документ не даёт полезной информации для индексации.
- Ingestion pipeline — конвейер, который загружает документы из источника (S3, базы данных, API), парсит, чанкует, эмбеддит и сохраняет в векторную БД. Ошибки на любом этапе могут приводить к появлению corrupted/empty записей.
2. Почему это критично для Agentic RAG
В Agentic RAG агенты самостоятельно принимают решения о поиске и генерации. Если в индексе оказывается corrupted документ (например, мусорные строки вместо текста), ретривер может вернуть его как релевантный, и LLM сгенерирует бессмысленный или галлюцинированный ответ. Empty документы обычно игнорируются, но тратят ресурсы на эмбеддинг и хранение. Кроме того, систематические ошибки инжестии маскируют проблемы качества данных — их нужно выявлять на ранних этапах.
3. Детекция corrupted / empty документов
Детекция должна происходить на нескольких этапах пайплайна:
| Этап | Что проверяем | Инструменты |
|---|---|---|
| Получение файла | Размер (0 байт?), хеш-сумма (corrupted при передаче) | Проверка size > 0, MD5/SHA256 |
| Парсинг | Структура документа (JSON schema, PDF corruption, HTML) | json.loads() / PyMuPDF / BeautifulSoup |
| Извлечение текста | Пустой текст после извлечения, нечитаемые символы | len(text.strip()) == 0, regex для кодировки |
| Чанкинг | Количество чанков (0 — ошибка) | Проверка длины списка чанков |
| Эмбеддинг | Эмбеддинг-вектор — все нули или NaN? | Проверка нормы вектора |
Пример простого валидатора на Python:
def validate_document(doc: dict) -> bool:
if doc.get("size", 0) == 0:
return False # empty
text = doc.get("content", "")
if not text or len(text.strip()) == 0:
return False # effectively empty
# Check for binary garbage (heuristic)
if sum(1 for c in text if ord(c) > 127) / max(len(text), 1) > 0.5:
return False # likely corrupted
return True
4. Стратегии обработки: Skip, Retry, Dead Letter Queue
После детекции необходимо решить, что делать с плохим документом. Три основные стратегии:
4.1 Skip (пропуск)
Наиболее простая стратегия: документ не добавляется в индекс, ошибка логируется. Подходит для несущественных данных (например, новостные ленты, где потеря одной статьи некритична).
Минус: теряется информация о том, что документ был, — позже сложно выявить систематическую проблему в источнике.
4.2 Retry (повтор)
Если ошибка могла быть временной (перегрузка сети, таймаут парсера), стоит попробовать обработать документ повторно. Обычно делают 2–3 попытки с экспоненциальной задержкой.
for attempt in range(3):
try:
ingest_document(doc)
break
except TemporaryError:
if attempt == 2:
send_to_dlq(doc) # после 3 попыток — в DLQ
time.sleep(2 ** attempt)
4.3 Dead Letter Queue (DLQ) — очередь недоставленных сообщений
Это наиболее robust подход, особенно в event-driven архитектурах (Kafka, RabbitMQ). Повреждённые документы отправляются в отдельный топик/очередь, где они:
- Ждут manual review (человек анализирует и решает: исправить, переслать или удалить);
- Могут быть автоматически переобработаны после фикса бага;
- Позволяют вести аудит всех ошибок инжестии.
Пример интеграции с Kafka
[Source Topic] → [Ingestion Consumer] → [DLQ Topic] (если ошибка)
↓
[Post-Processing] → [Vector DB]
Producer при ошибке пишет оригинальное сообщение с ошибкой в DLQ-топик:
if error:
kafka_producer.send("ingestion-dlq", value=doc_with_error_metadata)
else:
kafka_producer.send("ingestion-processed", value=processed_doc)
4.4 Manual review
Для DLQ нужен инструмент визуализации (например, UI вKafkaе: Kafdrop, Control Center|Confluent Control Center]]) или дашборд в Grafana, откуда оператор может просмотреть документы, скачать их и понять причину повреждения. Типичные действия: исправить сырой файл в источнике, добавить правило валидации, переслать документ вручную.
5. Метрики и алерты
Ключевая метрика — ingestion_error_rate (доля документов, признанных corrupted/empty, от общего числа обработанных). Порог — 1% (значение из черновика). При превышении — алерт (через PagerDuty, Slack, email).
Дополнительные метрики:
dlq_count— количество документов в DLQ (желательно держать 0 или стабильно низким);retry_attempts_per_doc— среднее число попыток;validation_fail_reason_distribution— распределение причин (empty, corrupt format, encoding error, schema mismatch).
Настройка алерта (пример Prometheus rule):
groups:
- name: ingestion
rules:
- alert: HighIngestionErrorRate
expr: rate(ingestion_errors_total[5m]) / rate(ingestion_attempts_total[5m]) > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "Ingestion error rate > 1% in last 5 minutes"
6. Автоматический retry после фикса
Когда проблема с источником исправлена (например, обновлён парсер, перезаписан повреждённый файл), необходимо повторно обработать документы из DLQ. Это можно сделать через:
- Ручной триггер: оператор выбирает записи в DLQ и нажимает "Re-process".
- Автоматический consumer: DLQ-топик периодически проверяется; если для документа прошло N часов, consumer пытается обработать его снова (с новыми правилами валидации).
Важно не создавать бесконечный цикл — после 3-х retry документ помечается как "dead true" и остаётся в DLQ для ручного разбора.
7. Best practices
| Практика | Описание |
|---|---|
| Валидация схемы | Используйте JSON Schema или Avro для проверки обязательных полей и типов на этапе сериализации. |
| Логирование контекста | В логах сохраняйте не только факт ошибки, но и идентификатор документа, источник, этап пайплайна. |
| Мониторинг дрейфа | Внезапный рост ошибок может означать проблему upstream (например, изменился формат файла). |
| Тестовые документы | Включайте в пайплайн синтетические corrupted/empty документы для проверки корректности обработки. |
| Идемпотентность | Повторная обработка одного и того же документа не должна дублировать данные в индексе. |
8. Пет-проект для закрепления
Задача: Реализовать ingestion pipeline для документов (PDF, TXT, JSON), который:
- детектирует и классифицирует corrupted/empty файлы;
- использует Kafka DLQ;
- имеет дашборд с метриками и алерт при ошибках >1%.
Инструменты: Python, Kafka (с помощью confluent_kafka), MinIO (S3-совместимое хранилище), Prometheus + Grafana.
Шаги:
- Написать producer, который читает файлы из локальной папки и отправляет в топик
raw-docs. - Consumer выполняет валидацию (проверка размера, текста, эмбеддинга). При ошибке пишет в
dlq-docs, при успехе — вprocessed-docs. - Реализовать встроенный экспорт метрик в Prometheus:
ingestion_attempts_total,ingestion_errors_total(label:reason). - Настроить Grafana дашборд с трендами ошибок и значением ingestion_error_rate.
- Создать скрипт, который периодически читает DLQ-топик и пытается повторно обработать записи (с дополнительной валидацией).
Ожидаемый результат: Работающий pipeline, в котором все ошибки инжестии фиксируются, не теряются и легко анализируются. Можно продемонстрировать, как внесение битого PDF вызывает его попадание в DLQ, а после исправления парсера и повторной отправки — успешную индексацию.
9. Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 272 | Как вы обрабатываете ошибки в Agentic RAG? (пересечение стратегий retry/DLQ) |
| 274 | Как организовать retry в пайплайне? (автоматические повторные попытки) |
| 275 | Как вы мониторите качество данных в RAG? (метрики, алерты) |
| 276 | Как детектить data drift в документах? (изменение формата → рост ошибок) |
| 5 | Как оцениваете качество retrieval? (impact corrupted документов на метрики) |
10. Навигация
- Предыдущий: 272
- Следующий: 274
- Индекс: 00. Индекс разборов
Навигация
- Предыдущий: 272
- Следующий: 274
- Индекс: 00. Индекс разборов