Как вы проектируете ETL пайплайн для 1M документов/день в RAG систему?

Краткий тезис

Проектирование ETL пайплайна для 1 млн документов в день в RAG-систему требует event-driven архитектуры на основе Kafka, сервиса ingestion для парсинга, чанкования и эмбеддинга, а также векторной БД с высокой пропускной способностью записи. Airflow используется для оркестрации, мониторинга и повторных попыток при сбоях. Ключевые принципы: отказоустойчивость, scaling|горизонтальное масштабирование и latency|низкая задержка между поступлением документа и его индексацией (latency < 1 минуты для real-time, до 1 часа для batch).

1. Термины и контекст

ETL (Extract, Transform, Load) — процесс извлечения данных из источников, их преобразования (очистка, структурирование, чанкование, эмбеддинг) и загрузки в целевую систему — векторную БД.

Event-driven архитектура — система, в которой компоненты реагируют на события (например, поступление нового документа), а не опрашивают источники по расписанию. Основной инструмент — Apache Kafka — распределённая платформа для потоковой передачи сообщений.

Ingestion service — микросервис, который подписывается на топики Kafka, получает документы, выполняет парсинг, chunking (разбиение на фрагменты), генерацию embeddings (векторных представлений) и отправляет результаты в векторную БД.

Векторная БД (Vector DB) — специализированное хранилище для индексации и поиска по эмбеддингам (например, Pinecone, Qdrant, Weaviate, Milvus, pgvector).

Airflow — open-source платформа для оркестрации рабочих процессов (DAG) с мощными средствами мониторинга, алертов и повторных запусков.

Пропускная способность (throughput) — количество документов, обработанных за единицу времени. Для 1M/день ≈ 12 документов/секунду (в среднем), но пиковые нагрузки могут быть выше.

2. Архитектура высокого уровня

Типовой пайплайн состоит из четырёх основных слоёв:

СлойКомпонентыФункция
Ingestion (Extract)Kafka Producer, API Gateway, Cloud Storage (S3/GCS)Получение документов из внешних источников и публикация в Kafka
TransformStreaming job (Faust, PySpark), Chunking service, Embedding serviceПарсинг, очистка, разбиение на чанки, генерация эмбеддингов
LoadVector DB bulk inserter, Indexing serviceПакетная запись чанков и векторов в векторную БД
Orchestration & MonitoringAirflow, Prometheus, Grafana, DLQ (Dead Letter Queue)Управление DAG, метрики, ретраи, обработка ошибок

Поток данных:

  1. Источник (HTTP API, файловый экспорт, вебхук) → Kafka (топик raw_documents)
  2. Сервис ingestion читает из топика, парсит (PDF, HTML, Markdown), чистит, разбивает на чанки (например, по 512 токенов с перекрытием 20 токенов)
  3. Chunk → Embedding model (например, text-embedding-3-small, e5-mistral-7b-instruct или BAAI/bge-m3). Эмбеддинги вычисляются батчами (batch size 32–128)
  4. Чанки + эмбеддинги записываются в векторную БД (через bulk insert, 500–1000 записей за раз)
  5. Airflow контролирует каждый этап: если сервис упал, автоматический restart и повторная обработка из Kafka (за счёт offset)

3. Ingestion (Extract) — как получать 1M документов в день

Варианты источников

  • HTTP API: каждый день партнёры или внутренние системы присылают документы через REST (POST /documents). Ставим балансировщик (Nginx) и несколько инстансов API.
  • Облачное хранилище: S3 или GCS — документы загружаются раз в час или непрерывно. Используем Object Storage Notifications (например, S3 Event → Lambda → Kafka).
  • Файловые дампы: CSV/JSON экспорты по расписанию — через Airflow загружаем в Kafka.

Почему Kafka

  • Буферизация: сглаживает пиковые нагрузки (1000 документов в минуту вместо 12/с).
  • Гарантия доставки (at-least-once) и возможность повторной обработки.
  • Partitioning: топик raw_documents разбивается на партиции (например, по источнику документа), ingestion сервис запускается с несколькими консьюмерами — параллелизм.

Масштабирование: для 1M/день достаточно 3–4 партиций Kafka и 2–3 консьюмера.

4. Transform — парсинг и очистка

Парсинг — извлечение текста из форматов: PDF, Word, HTML, Markdown, сканы (OCR). Инструменты:

  • PyMuPDF / pdfplumber — для PDF (таблицы, текст).
  • BeautifulSoup / trafilatura — для HTML (очистка от тегов, навигации).
  • python-docx — для Word.
  • Tesseract + easyocr — для изображений (OCR), только если нужен текст со сканов.

Очистка: удаление лишних пробелов, нормализация Юникода, удаление битых символов. Важно не потерять смысл.

Chunking — разбиение документа на фрагменты. Подходы:

СтратегияПримерКогда использовать
RecursiveCharacterTextSplitter512 tokens, overlap 20 tokensУниверсально, хорошо для прозы
Semantic chunkingSentence splitting + embeddingsКогда нужны смысловые границы (техическая документация)
Agentic chunking (LLM-based)LLM разбивает текстТолько если качество критично (дорого)

Для 1M/день выбираем RecursiveCharacterTextSplitter из LangChain — быстрый, детерминированный.

Embedding — модель для преобразования текста в вектор. Выбор:

  • OpenAI text-embedding-3-small — 1536 размерность, дешево, 5000 запросов в минуту (лимит).
  • Локальные модели (e5-mistral, BGE) — работают на GPU, без лимитов, но требуют инфраструктуры.
  • Батчизация: отправляем батчи по 32–128 чанков, чтобы достичь высокой пропускной способности.

Кэширование эмбеддингов: если один и тот же документ обрабатывается повторно (например, переиндексация), не вычисляем эмбеддинги заново — сохраняем хэш текста и результат.

5. Load — запись в векторную БД

Bulk insert — пакетная запись (batch size 500–1000) уменьшает количество запросов к БД. Векторные БД поддерживают:

  • Pinecone: upsert() с max 1000 векторов за один вызов.
  • Qdrant: upsert_points() — батч до 1000.
  • Milvus: insert() с батчом до 32768.

Индексация: после вставки запускается построение индекса (HNSW, IVF). Для real-time индексации используем HNSW с параметрами, оптимизированными под скорость записи (ef_construction = 100, M = 16). Для batch — можно запускать перестроение индекса раз в час.

Обработка дубликатов: используем document_id как уникальный ключ. Если документ уже существует (по content hash или id), обновляем его чанки (upsert).

Мониторинг записи: метрики скорости записи (docs/sec), ошибок, задержки.

6. Оркестрация и мониторинг (Airflow)

Airflow DAG — определяет последовательность шагов и их зависимости.

Пример DAG (псевдокод):

from airflow import DAG
from airflow.providers.apache.kafka.operators.kafka_consume import ConsumeFromTopicOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='document_ingestion_pipeline',
    schedule_interval='@daily',  # или timedelta(hours=1)
    start_date=datetime(2025, 1, 1),
    catchup=False,
    default_args=default_args,
) as dag:
    # 1. Извлечение документов из S3 и отправка в Kafka
    extract = PythonOperator(
        task_id='extract_from_s3_to_kafka',
        python_callable=extract_docs_to_kafka,
        retries=2,
    )

    # 2. Парсинг, chunking, embedding (выполняется в сервисе ingestion)
    # Airflow может просто запускать проверку, что сервис жив
    check_ingestion_health = PythonOperator(
        task_id='check_ingestion_health',
        python_callable=check_ingestion_service,
    )

    # 3. Мониторинг очереди DLQ (dead letter queue)
    process_dlq = PythonOperator(
        task_id='process_dlq',
        python_callable=process_dlq_messages,
    )

    # 4. После обработки — обновление индексов в БД (если нужно)
    trigger_index_build = PythonOperator(
        task_id='trigger_index_build',
        python_callable=trigger_index_build,
    )

    extract >> check_ingestion_health >> process_dlq >> trigger_index_build

Dead Letter Queue — топик Kafka, куда попадают документы, которые не удалось обработать после max_retries (например, битый PDF). Airflow периодически проверяет DLQ и сообщает об ошибках.

Метрики: Prometheus собирает latency, throughput, error rate. Grafana — дашборды.

Ретраи: каждый этап имеет политику retry (экспоненциальная задержка) и отдельный механизм для долгих ошибок.

7. Масштабирование и пропускная способность

Для 1M документов/день средняя скорость = 1M / 86400 ≈ 12 docs/sec. Но пиковые нагрузки (в 8 утра) могут быть в 10 раз выше → нужно закладывать запас до 120 docs/sec.

Как масштабировать

  • Kafka: увеличить число партиций (например, 6–8) и количество consumer в группе — параллельная обработка.
  • Ingestion service: запустить несколько реплик (Kubernetes HPA по CPU / backlog).
  • Embedding model: использовать GPU (NVIDIA T4, A10) с батчированием. Один GPU на модели BGE-m3 может обрабатывать 200–500 чанков/с.
  • Векторная БД: шардирование (например, Qdrant с multi-node, Pinecone pods).

Бюджет по latency:

  • Real-time (chat) — желательно < 10 секунд от появления документа до его доступности в поиске (для этого подходит потоковая обработка).
  • Batch (ежедневные дампы) — допустимо до 1–2 часов.

Для real-time лучше использовать Kafka Streams или Faust вместо Airflow для непосредственной обработки, а Airflow оставить для мониторинга и ретроспективного анализа.

8. Обработка ошибок и качество данных

Типичные ошибки

  • Битый PDF → парсинг не удался → отправляем в DLQ.
  • Embedding model вызов превышает лимиты (rate limit) → retry с экспоненциальной задержкой.
  • Векторная БД перегружена (throttling) → батч уменьшить или retry.

Контроль качества

  • Доля неудачных обработок (% success rate).
  • Средний размер чанка (если слишком мал — много шума, слишком велик — теряется релевантность).
  • Проверка, что документы после ETL корректно находятся в поиске (можно прогонять тестовые запросы после каждой загрузки).

Data validation на каждом этапе (парсинг, chunking, embedding) можно добавить служебные метаданные (заголовок, хэш, длину текста) и верифицировать их.

9. Связь с Agentic RAG

Для Agentic RAG (когда агенты самостоятельно решают, какие документы добавить/удалить) ETL пайплайн должен поддерживать динамическое обновление:

  • Агент может запросить индексацию нового документа «на лету».
  • Пайплайн должен быть чувствительным к командам из топика agent_commands — например, отправить документ на переиндексацию или удалить устаревший.
  • Для этого добавляем механизм event sourcing: каждое действие агента — это событие в Kafka, ingestion service обрабатывает его и обновляет векторную БД.

10. Компромиссы и решения

ПараметрВариант AВариант BКомментарий
ОбработкаStreaming (Kafka Streams)Batch (Airflow + Spark)Streaming для real-time, batch если latency неважна
EmbeddingОнлайн (вызывать при индексации)Предвычисленные из кэшаОнлайн гибко, кэш экономит ресурсы
Векторная БДSaaS (Pinecone, Weaviate)Self-hosted (Qdrant, Milvus)SaaS проще, но дороже; self-hosted безопаснее
ЧанкерФиксированный размер (token)Semantic chunkerSemantic качественнее, но медленнее

Для 1M/день чаще выбирают streaming + batch (гибрид) для устойчивости.

Пет-проект для закрепления

Задача: спроектировать и реализовать эмуляцию ETL пайплайна для 100 000 документов (текстовые файлы) с последующей загрузкой в локальную Qdrant.

Инструменты: Docker Compose, Python, Apache Kafka (через confluent-kafka), Qdrant, LangChain (RecursiveCharacterTextSplitter), sentence-transformers (BAAI/bge-small-en), Airflow (локально или MiniAirflow).

Шаги:

  1. Развернуть Kafka (1 брокер) и Qdrant (1 узел) в Docker Compose.
  2. Написать Producer: генерирует 1000 текстовых «документов» (можно взять случайные абзацы из wiki) и отправляет в топик raw_documents.
  3. Написать Consumer-сервис на Python (Faust или простой aiokafka): парсит (просто берём текст), разбивает на чанки (512 токенов, overlap 20), вычисляет векторы (model.encode(batch)).
  4. Bulk insert в Qdrant (коллекция с HNSW).
  5. Airflow DAG: запускает Producer каждый час, мониторит количество обработанных документов через Consumer и Qdrant dashboard.
  6. Организовать DLQ: битые документы (вставить символ \x00, чтобы симулировать ошибку) отправляются в отдельный топик.
  7. Написать скрипт для поиска по Qdrant и проверить, что хотя бы 90% документов найдены по ключевым словам.

Ожидаемый результат: полностью работающий пайплайн, который обрабатывает 1000 документов за < 10 секунд (на CPU). Дашборды в Grafana показывают latency, throughput, ошибки.

Связь с другими вопросами

ВопросТема
1Проектирование RAG с различными форматами
9Инкрементальное обновление (upsert, delete)
3Выбор стратегии чанкования
7Оптимизация задержки
10Динамический retrieval с использованием агентов

Навигация