English translation is not available yet. Showing Russian content.
Как вы проектируете ETL пайплайн для 1M документов/день в RAG систему?
Краткий тезис
Проектирование ETL пайплайна для 1 млн документов в день в RAG-систему требует event-driven архитектуры на основе Kafka, сервиса ingestion для парсинга, чанкования и эмбеддинга, а также векторной БД с высокой пропускной способностью записи. Airflow используется для оркестрации, мониторинга и повторных попыток при сбоях. Ключевые принципы: отказоустойчивость, scaling|горизонтальное масштабирование и latency|низкая задержка между поступлением документа и его индексацией (latency < 1 минуты для real-time, до 1 часа для batch).
1. Термины и контекст
ETL (Extract, Transform, Load) — процесс извлечения данных из источников, их преобразования (очистка, структурирование, чанкование, эмбеддинг) и загрузки в целевую систему — векторную БД.
Event-driven архитектура — система, в которой компоненты реагируют на события (например, поступление нового документа), а не опрашивают источники по расписанию. Основной инструмент — Apache Kafka — распределённая платформа для потоковой передачи сообщений.
Ingestion service — микросервис, который подписывается на топики Kafka, получает документы, выполняет парсинг, chunking (разбиение на фрагменты), генерацию embeddings (векторных представлений) и отправляет результаты в векторную БД.
Векторная БД (Vector DB) — специализированное хранилище для индексации и поиска по эмбеддингам (например, Pinecone, Qdrant, Weaviate, Milvus, pgvector).
Airflow — open-source платформа для оркестрации рабочих процессов (DAG) с мощными средствами мониторинга, алертов и повторных запусков.
Пропускная способность (throughput) — количество документов, обработанных за единицу времени. Для 1M/день ≈ 12 документов/секунду (в среднем), но пиковые нагрузки могут быть выше.
2. Архитектура высокого уровня
Типовой пайплайн состоит из четырёх основных слоёв:
| Слой | Компоненты | Функция |
|---|---|---|
| Ingestion (Extract) | Kafka Producer, API Gateway, Cloud Storage (S3/GCS) | Получение документов из внешних источников и публикация в Kafka |
| Transform | Streaming job (Faust, PySpark), Chunking service, Embedding service | Парсинг, очистка, разбиение на чанки, генерация эмбеддингов |
| Load | Vector DB bulk inserter, Indexing service | Пакетная запись чанков и векторов в векторную БД |
| Orchestration & Monitoring | Airflow, Prometheus, Grafana, DLQ (Dead Letter Queue) | Управление DAG, метрики, ретраи, обработка ошибок |
Поток данных:
- Источник (HTTP API, файловый экспорт, вебхук) → Kafka (топик
raw_documents) - Сервис ingestion читает из топика, парсит (PDF, HTML, Markdown), чистит, разбивает на чанки (например, по 512 токенов с перекрытием 20 токенов)
- Chunk → Embedding model (например, text-embedding-3-small, e5-mistral-7b-instruct или BAAI/bge-m3). Эмбеддинги вычисляются батчами (batch size 32–128)
- Чанки + эмбеддинги записываются в векторную БД (через bulk insert, 500–1000 записей за раз)
- Airflow контролирует каждый этап: если сервис упал, автоматический restart и повторная обработка из Kafka (за счёт offset)
3. Ingestion (Extract) — как получать 1M документов в день
Варианты источников
- HTTP API: каждый день партнёры или внутренние системы присылают документы через REST (POST /documents). Ставим балансировщик (Nginx) и несколько инстансов API.
- Облачное хранилище: S3 или GCS — документы загружаются раз в час или непрерывно. Используем Object Storage Notifications (например, S3 Event → Lambda → Kafka).
- Файловые дампы: CSV/JSON экспорты по расписанию — через Airflow загружаем в Kafka.
Почему Kafka
- Буферизация: сглаживает пиковые нагрузки (1000 документов в минуту вместо 12/с).
- Гарантия доставки (at-least-once) и возможность повторной обработки.
- Partitioning: топик
raw_documentsразбивается на партиции (например, по источнику документа), ingestion сервис запускается с несколькими консьюмерами — параллелизм.
Масштабирование: для 1M/день достаточно 3–4 партиций Kafka и 2–3 консьюмера.
4. Transform — парсинг и очистка
Парсинг — извлечение текста из форматов: PDF, Word, HTML, Markdown, сканы (OCR). Инструменты:
- PyMuPDF / pdfplumber — для PDF (таблицы, текст).
- BeautifulSoup / trafilatura — для HTML (очистка от тегов, навигации).
- python-docx — для Word.
- Tesseract + easyocr — для изображений (OCR), только если нужен текст со сканов.
Очистка: удаление лишних пробелов, нормализация Юникода, удаление битых символов. Важно не потерять смысл.
Chunking — разбиение документа на фрагменты. Подходы:
| Стратегия | Пример | Когда использовать |
|---|---|---|
| RecursiveCharacterTextSplitter | 512 tokens, overlap 20 tokens | Универсально, хорошо для прозы |
| Semantic chunking | Sentence splitting + embeddings | Когда нужны смысловые границы (техическая документация) |
| Agentic chunking (LLM-based) | LLM разбивает текст | Только если качество критично (дорого) |
Для 1M/день выбираем RecursiveCharacterTextSplitter из LangChain — быстрый, детерминированный.
Embedding — модель для преобразования текста в вектор. Выбор:
- OpenAI text-embedding-3-small — 1536 размерность, дешево, 5000 запросов в минуту (лимит).
- Локальные модели (e5-mistral, BGE) — работают на GPU, без лимитов, но требуют инфраструктуры.
- Батчизация: отправляем батчи по 32–128 чанков, чтобы достичь высокой пропускной способности.
Кэширование эмбеддингов: если один и тот же документ обрабатывается повторно (например, переиндексация), не вычисляем эмбеддинги заново — сохраняем хэш текста и результат.
5. Load — запись в векторную БД
Bulk insert — пакетная запись (batch size 500–1000) уменьшает количество запросов к БД. Векторные БД поддерживают:
- Pinecone:
upsert()с max 1000 векторов за один вызов. - Qdrant:
upsert_points()— батч до 1000. - Milvus:
insert()с батчом до 32768.
Индексация: после вставки запускается построение индекса (HNSW, IVF). Для real-time индексации используем HNSW с параметрами, оптимизированными под скорость записи (ef_construction = 100, M = 16). Для batch — можно запускать перестроение индекса раз в час.
Обработка дубликатов: используем document_id как уникальный ключ. Если документ уже существует (по content hash или id), обновляем его чанки (upsert).
Мониторинг записи: метрики скорости записи (docs/sec), ошибок, задержки.
6. Оркестрация и мониторинг (Airflow)
Airflow DAG — определяет последовательность шагов и их зависимости.
Пример DAG (псевдокод):
from airflow import DAG
from airflow.providers.apache.kafka.operators.kafka_consume import ConsumeFromTopicOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='document_ingestion_pipeline',
schedule_interval='@daily', # или timedelta(hours=1)
start_date=datetime(2025, 1, 1),
catchup=False,
default_args=default_args,
) as dag:
# 1. Извлечение документов из S3 и отправка в Kafka
extract = PythonOperator(
task_id='extract_from_s3_to_kafka',
python_callable=extract_docs_to_kafka,
retries=2,
)
# 2. Парсинг, chunking, embedding (выполняется в сервисе ingestion)
# Airflow может просто запускать проверку, что сервис жив
check_ingestion_health = PythonOperator(
task_id='check_ingestion_health',
python_callable=check_ingestion_service,
)
# 3. Мониторинг очереди DLQ (dead letter queue)
process_dlq = PythonOperator(
task_id='process_dlq',
python_callable=process_dlq_messages,
)
# 4. После обработки — обновление индексов в БД (если нужно)
trigger_index_build = PythonOperator(
task_id='trigger_index_build',
python_callable=trigger_index_build,
)
extract >> check_ingestion_health >> process_dlq >> trigger_index_build
Dead Letter Queue — топик Kafka, куда попадают документы, которые не удалось обработать после max_retries (например, битый PDF). Airflow периодически проверяет DLQ и сообщает об ошибках.
Метрики: Prometheus собирает latency, throughput, error rate. Grafana — дашборды.
Ретраи: каждый этап имеет политику retry (экспоненциальная задержка) и отдельный механизм для долгих ошибок.
7. Масштабирование и пропускная способность
Для 1M документов/день средняя скорость = 1M / 86400 ≈ 12 docs/sec. Но пиковые нагрузки (в 8 утра) могут быть в 10 раз выше → нужно закладывать запас до 120 docs/sec.
Как масштабировать
- Kafka: увеличить число партиций (например, 6–8) и количество consumer в группе — параллельная обработка.
- Ingestion service: запустить несколько реплик (Kubernetes HPA по CPU / backlog).
- Embedding model: использовать GPU (NVIDIA T4, A10) с батчированием. Один GPU на модели BGE-m3 может обрабатывать 200–500 чанков/с.
- Векторная БД: шардирование (например, Qdrant с multi-node, Pinecone pods).
Бюджет по latency:
- Real-time (chat) — желательно < 10 секунд от появления документа до его доступности в поиске (для этого подходит потоковая обработка).
- Batch (ежедневные дампы) — допустимо до 1–2 часов.
Для real-time лучше использовать Kafka Streams или Faust вместо Airflow для непосредственной обработки, а Airflow оставить для мониторинга и ретроспективного анализа.
8. Обработка ошибок и качество данных
Типичные ошибки
- Битый PDF → парсинг не удался → отправляем в DLQ.
- Embedding model вызов превышает лимиты (rate limit) → retry с экспоненциальной задержкой.
- Векторная БД перегружена (throttling) → батч уменьшить или retry.
Контроль качества
- Доля неудачных обработок (% success rate).
- Средний размер чанка (если слишком мал — много шума, слишком велик — теряется релевантность).
- Проверка, что документы после ETL корректно находятся в поиске (можно прогонять тестовые запросы после каждой загрузки).
Data validation на каждом этапе (парсинг, chunking, embedding) можно добавить служебные метаданные (заголовок, хэш, длину текста) и верифицировать их.
9. Связь с Agentic RAG
Для Agentic RAG (когда агенты самостоятельно решают, какие документы добавить/удалить) ETL пайплайн должен поддерживать динамическое обновление:
- Агент может запросить индексацию нового документа «на лету».
- Пайплайн должен быть чувствительным к командам из топика
agent_commands— например, отправить документ на переиндексацию или удалить устаревший. - Для этого добавляем механизм event sourcing: каждое действие агента — это событие в Kafka, ingestion service обрабатывает его и обновляет векторную БД.
10. Компромиссы и решения
| Параметр | Вариант A | Вариант B | Комментарий |
|---|---|---|---|
| Обработка | Streaming (Kafka Streams) | Batch (Airflow + Spark) | Streaming для real-time, batch если latency неважна |
| Embedding | Онлайн (вызывать при индексации) | Предвычисленные из кэша | Онлайн гибко, кэш экономит ресурсы |
| Векторная БД | SaaS (Pinecone, Weaviate) | Self-hosted (Qdrant, Milvus) | SaaS проще, но дороже; self-hosted безопаснее |
| Чанкер | Фиксированный размер (token) | Semantic chunker | Semantic качественнее, но медленнее |
Для 1M/день чаще выбирают streaming + batch (гибрид) для устойчивости.
Пет-проект для закрепления
Задача: спроектировать и реализовать эмуляцию ETL пайплайна для 100 000 документов (текстовые файлы) с последующей загрузкой в локальную Qdrant.
Инструменты: Docker Compose, Python, Apache Kafka (через confluent-kafka), Qdrant, LangChain (RecursiveCharacterTextSplitter), sentence-transformers (BAAI/bge-small-en), Airflow (локально или MiniAirflow).
Шаги:
- Развернуть Kafka (1 брокер) и Qdrant (1 узел) в Docker Compose.
- Написать Producer: генерирует 1000 текстовых «документов» (можно взять случайные абзацы из wiki) и отправляет в топик
raw_documents. - Написать Consumer-сервис на Python (Faust или простой
aiokafka): парсит (просто берём текст), разбивает на чанки (512 токенов, overlap 20), вычисляет векторы (model.encode(batch)). - Bulk insert в Qdrant (коллекция с HNSW).
- Airflow DAG: запускает Producer каждый час, мониторит количество обработанных документов через Consumer и Qdrant dashboard.
- Организовать DLQ: битые документы (вставить символ
\x00, чтобы симулировать ошибку) отправляются в отдельный топик. - Написать скрипт для поиска по Qdrant и проверить, что хотя бы 90% документов найдены по ключевым словам.
Ожидаемый результат: полностью работающий пайплайн, который обрабатывает 1000 документов за < 10 секунд (на CPU). Дашборды в Grafana показывают latency, throughput, ошибки.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | Проектирование RAG с различными форматами |
| 9 | Инкрементальное обновление (upsert, delete) |
| 3 | Выбор стратегии чанкования |
| 7 | Оптимизация задержки |
| 10 | Динамический retrieval с использованием агентов |
Навигация
- Предыдущий: 255
- Следующий: 257
- Индекс: 00. Индекс разборов