English translation is not available yet. Showing Russian content.
Как вы проектируете ETL пайплайн для 1M документов/день в RAG систему?
Краткий тезис
ETL пайплайн для RAG с нагрузкой 1M документов в день строится на event-driven архитектуре с брокером сообщений (Kafka), асинхронными consumer'ами для парсинга/чанкинга и эмбеддинга, оркестрацией через Airflow и Dead Letter Queue для отказоустойчивости. Ключевые метрики — latency индексации (SLO < 5 минут) и ingestion rate (документов/сек). Такой пайплайн обеспечивает горизонтальное масштабирование и обработку пиковых нагрузок без потери данных.
1. Термин: ETL пайплайн для RAG
ETL (Extract, Transform, Load) — процесс извлечения данных из источников, их преобразования (парсинг, чанкинг, эмбеддинг) и загрузки в целевую систему (векторная БД). В контексте RAG ETL пайплайн отвечает за непрерывную индексацию документов, чтобы retrieval мог находить актуальную информацию.
1M документов/день — это ~11.6 документов в секунду в среднем, но пики могут быть выше. Пайплайн должен быть асинхронным, отказоустойчивым и масштабируемым.
2. Архитектура: Event-driven с Kafka
Event-driven архитектура — система, где компоненты реагируют на события (например, появление нового документа). Apache Kafka — распределённый брокер сообщений, который выступает буфером между источниками и обработчиками.
Поток данных
- Документы поступают из разных источников (S3, API, FTP) → публикуются в Kafka topic
docs.raw. - Topic разбит на партиции по
source_idдля параллельной обработки. - Consumer'ы читают сообщения из партиций и выполняют этапы ETL.
Почему Kafka
- Высокая пропускная способность (миллионы сообщений/сек)
- Гарантия доставки (at-least-once)
- Возможность replay сообщений при сбоях
- Горизонтальное масштабирование через увеличение партиций
3. Consumer 1: Парсинг и чанкинг
Первый consumer читает из docs.raw и выполняет:
- Парсинг — извлечение текста из PDF, HTML, DOCX, Markdown и т.д. Используются библиотеки:
PyMuPDF,BeautifulSoup,python-docx. - Чанкинг — разбиение текста на фрагменты (чанки) фиксированной длины (например, 512 токенов) с перекрытием (overlap) для сохранения контекста. Стратегии: recursive character splitter, semantic chunking.
Результат — сообщение с метаданными (source_id, chunk_id, текст) отправляется в Kafka topic docs.chunks.
Пример конфигурации consumer (Python):
from kafka import KafkaConsumer, KafkaProducer
import json
consumer = KafkaConsumer(
'docs.raw',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for message in consumer:
doc = message.value
chunks = chunk_text(doc['text'], chunk_size=512, overlap=50)
for i, chunk in enumerate(chunks):
producer.send('docs.chunks', {
'source_id': doc['source_id'],
'chunk_id': f"{doc['id']}_{i}",
'text': chunk,
'metadata': doc.get('metadata', {})
})
4. Consumer 2: Эмбеддинг и индексация
Второй consumer читает из docs.chunks и:
- Батчинг — группирует чанки по 100–200 для эффективного вызова модели эмбеддингов (batch inference).
- Эмбеддинг — преобразует текст в вектор с помощью модели (например,
text-embedding-ada-002,all-MiniLM-L6-v2). - Загрузка — записывает векторы и метаданные в векторную БД (Pinecone, Weaviate, Qdrant, Milvus).
Почему батчинг модели эмбеддингов работают быстрее при обработке батчами (GPU утилизация выше), снижается latency на один чанк.
Пример батчинга
batch_size = 150
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= batch_size:
embeddings = embedding_model.encode([c['text'] for c in batch])
vector_db.upsert(vectors=embeddings, metadata=batch)
batch = []
5. Оркестрация: Airflow
Apache Airflow — платформа для оркестрации рабочих процессов. Используется для:
- Мониторинга здоровья consumer'ов (если consumer упал — Airflow перезапускает)
- Retry при временных ошибках (например, таймаут векторной БД)
- Алертов при превышении порогов latency
- Периодической переиндексации (если нужно обновить эмбеддинги)
DAG (Directed Acyclic Graph) — описание пайплайна в коде Python:
from airflow import DAG
from airflow.operators.python import PythonOperator
def check_consumer_health():
# проверка, что consumer живы
pass
with DAG('etl_pipeline', schedule_interval='@hourly') as dag:
health_check = PythonOperator(task_id='health_check', python_callable=check_consumer_health)
6. Обработка ошибок: Dead Letter Queue
Dead Letter Queue (DLQ) — отдельный Kafka topic (docs.dlq), куда попадают сообщения, которые не удалось обработать после нескольких retry. Причины: corrupted PDF, таймаут эмбеддинга, неверный формат.
Процесс
- Consumer пытается обработать сообщение (max 3 retry с exponential backoff).
- Если все попытки неудачны → сообщение отправляется в DLQ с указанием причины ошибки.
- Отдельный процесс (или человек) анализирует DLQ и принимает решение: повторная обработка, ручное исправление, игнорирование.
Пример реализации
max_retries = 3
for attempt in range(max_retries):
try:
process_message(msg)
break
except Exception as e:
if attempt == max_retries - 1:
producer.send('docs.dlq', {'error': str(e), 'message': msg})
else:
time.sleep(2 ** attempt)
7. Метрики и SLO
SLO (Service Level Objective) — целевой уровень качества сервиса. Для ETL пайплайна:
- Latency от загрузки до индексации — время от публикации документа в
docs.rawдо его появления в векторной БД. SLO < 5 минут. - Ingestion rate — количество документов, обработанных за секунду. Цель: > 12 док/сек (с запасом на пики).
Метрики для мониторинга
| Метрика | Описание | Инструмент |
|---|---|---|
kafka_consumer_lag | Отставание consumer'а от последнего сообщения | Prometheus + Grafana |
processing_time | Время обработки одного документа | Airflow logs |
error_rate | Доля сообщений, ушедших в DLQ | Kafka metrics |
throughput | Документов/сек | Custom counter |
Пример алерта если kafka_consumer_lag > 10000 сообщений в течение 5 минут → PagerDuty.
8. Масштабирование и оптимизация
Горизонтальное масштабирование
- Увеличить количество партиций Kafka (например, с 3 до 10).
- Запустить несколько экземпляров consumer'ов в одной consumer group (Kafka автоматически распределяет партиции).
- Добавить реплики векторной БД.
Оптимизация чанкинга
- Использовать semantic chunking (разбиение по смысловым границам) вместо фиксированной длины — улучшает качество retrieval.
- Кэшировать результаты парсинга для повторяющихся документов.
Оптимизация эмбеддинга
- Использовать GPU для batch inference.
- Выбрать модель с подходящим trade-off скорость/качество (например,
intfloat/e5-small-v2быстрее, чемtext-embedding-3-large).
9. Выбор инструментов: сравнение
| Компонент | Варианты | Плюсы | Минусы |
|---|---|---|---|
| Брокер сообщений | Kafka, RabbitMQ, AWS SQS | Kafka: высокая пропускная способность, replay | Kafka: сложнее в настройке |
| Векторная БД | Pinecone, Weaviate, Qdrant, Milvus | Pinecone: managed, простота; Milvus: open-source, гибкость | Pinecone: дорого; Milvus: требует администрирования |
| Оркестрация | Airflow, Prefect, Dagster | Airflow: зрелость, большое сообщество | Airflow: тяжеловесен для простых задач |
| Эмбеддинг модель | OpenAI, Cohere, Sentence-Transformers | OpenAI: качество; Sentence-Transformers: бесплатно | OpenAI: плата за API; локальные модели: ресурсы |
Рекомендация для 1M док/день Kafka + Qdrant (self-hosted) + Airflow + Sentence-Transformers на GPU.
10. Пример кода: полный consumer для эмбеддинга
import json
import time
from kafka import KafkaConsumer, KafkaProducer
from sentence_transformers import SentenceTransformer
import qdrant_client
# Инициализация
model = SentenceTransformer('all-MiniLM-L6-v2')
client = qdrant_client.QdrantClient(host='localhost', port=6333)
consumer = KafkaConsumer('docs.chunks', bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
BATCH_SIZE = 150
batch = []
for msg in consumer:
chunk = msg.value
batch.append(chunk)
if len(batch) >= BATCH_SIZE:
texts = [c['text'] for c in batch]
try:
embeddings = model.encode(texts, show_progress_bar=False)
points = [
qdrant_client.models.PointStruct(
id=hash(c['chunk_id']),
vector=emb.tolist(),
payload={'source_id': c['source_id'], 'text': c['text']}
)
for c, emb in zip(batch, embeddings)
]
client.upsert(collection_name='docs', points=points)
except Exception as e:
# Отправка в DLQ
for c in batch:
producer.send('docs.dlq', {'error': str(e), 'chunk': c})
finally:
batch = []
Пет-проект для закрепления
Задача Разработать ETL пайплайн для индексации 10 000 новостных статей (RSS-ленты) в RAG-систему.
Инструменты Docker, Kafka (через wurstmeister/kafka), Python, Sentence-Transformers, Qdrant (in-memory для теста), Airflow (опционально).
Шаги:
- Настройте Kafka локально (1 топик
news.raw, 3 партиции). - Напишите producer, который читает RSS и публикует статьи в
news.raw. - Consumer 1: парсинг HTML (BeautifulSoup) → чанкинг (RecursiveCharacterTextSplitter из LangChain) → публикация в
news.chunks. - Consumer 2: батчинг по 50 → эмбеддинг → загрузка в Qdrant.
- Добавьте DLQ: при ошибке парсинга отправляйте в
news.dlq. - Напишите скрипт мониторинга: выводите consumer lag и throughput.
Ожидаемый результат Работающий пайплайн, который индексирует статьи с latency < 10 секунд. Вы сможете задавать вопросы к новостям через простой RAG-интерфейс (например, Streamlit + LangChain).
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | Проектирование RAG-системы для 10 000 документов |
| 2 | Проблема lost in the middle |
| 3 | Стратегии chunking'а |
| 4 | Обновление документов в RAG |
| 5 | Оценка качества retrieval |
| 6 | Hybrid search |
| 7 | Уменьшение latency RAG |
| 8 | Обработка запросов без ответа |
| 9 | Обновление документов (повтор) |
| 10 | Self-RAG |
Навигация
- Предыдущий: 510
- Следующий: 512
- Индекс: 00. Индекс разборов