中文翻译暂不可用,显示俄语原文。

Как вы проектируете ETL пайплайн для 1M документов/день в RAG систему?

Краткий тезис

ETL пайплайн для RAG с нагрузкой 1M документов в день строится на event-driven архитектуре с брокером сообщений (Kafka), асинхронными consumer'ами для парсинга/чанкинга и эмбеддинга, оркестрацией через Airflow и Dead Letter Queue для отказоустойчивости. Ключевые метрики — latency индексации (SLO < 5 минут) и ingestion rate (документов/сек). Такой пайплайн обеспечивает горизонтальное масштабирование и обработку пиковых нагрузок без потери данных.


1. Термин: ETL пайплайн для RAG

ETL (Extract, Transform, Load) — процесс извлечения данных из источников, их преобразования (парсинг, чанкинг, эмбеддинг) и загрузки в целевую систему (векторная БД). В контексте RAG ETL пайплайн отвечает за непрерывную индексацию документов, чтобы retrieval мог находить актуальную информацию.

1M документов/день — это ~11.6 документов в секунду в среднем, но пики могут быть выше. Пайплайн должен быть асинхронным, отказоустойчивым и масштабируемым.


2. Архитектура: Event-driven с Kafka

Event-driven архитектура — система, где компоненты реагируют на события (например, появление нового документа). Apache Kafka — распределённый брокер сообщений, который выступает буфером между источниками и обработчиками.

Поток данных

  1. Документы поступают из разных источников (S3, API, FTP) → публикуются в Kafka topic docs.raw.
  2. Topic разбит на партиции по source_id для параллельной обработки.
  3. Consumer'ы читают сообщения из партиций и выполняют этапы ETL.

Почему Kafka

  • Высокая пропускная способность (миллионы сообщений/сек)
  • Гарантия доставки (at-least-once)
  • Возможность replay сообщений при сбоях
  • Горизонтальное масштабирование через увеличение партиций

3. Consumer 1: Парсинг и чанкинг

Первый consumer читает из docs.raw и выполняет:

  • Парсинг — извлечение текста из PDF, HTML, DOCX, Markdown и т.д. Используются библиотеки: PyMuPDF, BeautifulSoup, python-docx.
  • Чанкинг — разбиение текста на фрагменты (чанки) фиксированной длины (например, 512 токенов) с перекрытием (overlap) для сохранения контекста. Стратегии: recursive character splitter, semantic chunking.

Результат — сообщение с метаданными (source_id, chunk_id, текст) отправляется в Kafka topic docs.chunks.

Пример конфигурации consumer (Python):

from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer(
    'docs.raw',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for message in consumer:
    doc = message.value
    chunks = chunk_text(doc['text'], chunk_size=512, overlap=50)
    for i, chunk in enumerate(chunks):
        producer.send('docs.chunks', {
            'source_id': doc['source_id'],
            'chunk_id': f"{doc['id']}_{i}",
            'text': chunk,
            'metadata': doc.get('metadata', {})
        })

4. Consumer 2: Эмбеддинг и индексация

Второй consumer читает из docs.chunks и:

  • Батчинг — группирует чанки по 100–200 для эффективного вызова модели эмбеддингов (batch inference).
  • Эмбеддинг — преобразует текст в вектор с помощью модели (например, text-embedding-ada-002, all-MiniLM-L6-v2).
  • Загрузка — записывает векторы и метаданные в векторную БД (Pinecone, Weaviate, Qdrant, Milvus).

Почему батчинг модели эмбеддингов работают быстрее при обработке батчами (GPU утилизация выше), снижается latency на один чанк.

Пример батчинга

batch_size = 150
batch = []
for message in consumer:
    batch.append(message.value)
    if len(batch) >= batch_size:
        embeddings = embedding_model.encode([c['text'] for c in batch])
        vector_db.upsert(vectors=embeddings, metadata=batch)
        batch = []

5. Оркестрация: Airflow

Apache Airflow — платформа для оркестрации рабочих процессов. Используется для:

  • Мониторинга здоровья consumer'ов (если consumer упал — Airflow перезапускает)
  • Retry при временных ошибках (например, таймаут векторной БД)
  • Алертов при превышении порогов latency
  • Периодической переиндексации (если нужно обновить эмбеддинги)

DAG (Directed Acyclic Graph) — описание пайплайна в коде Python:

from airflow import DAG
from airflow.operators.python import PythonOperator

def check_consumer_health():
    # проверка, что consumer живы
    pass

with DAG('etl_pipeline', schedule_interval='@hourly') as dag:
    health_check = PythonOperator(task_id='health_check', python_callable=check_consumer_health)

6. Обработка ошибок: Dead Letter Queue

Dead Letter Queue (DLQ) — отдельный Kafka topic (docs.dlq), куда попадают сообщения, которые не удалось обработать после нескольких retry. Причины: corrupted PDF, таймаут эмбеддинга, неверный формат.

Процесс

  1. Consumer пытается обработать сообщение (max 3 retry с exponential backoff).
  2. Если все попытки неудачны → сообщение отправляется в DLQ с указанием причины ошибки.
  3. Отдельный процесс (или человек) анализирует DLQ и принимает решение: повторная обработка, ручное исправление, игнорирование.

Пример реализации

max_retries = 3
for attempt in range(max_retries):
    try:
        process_message(msg)
        break
    except Exception as e:
        if attempt == max_retries - 1:
            producer.send('docs.dlq', {'error': str(e), 'message': msg})
        else:
            time.sleep(2 ** attempt)

7. Метрики и SLO

SLO (Service Level Objective) — целевой уровень качества сервиса. Для ETL пайплайна:

  • Latency от загрузки до индексации — время от публикации документа в docs.raw до его появления в векторной БД. SLO < 5 минут.
  • Ingestion rate — количество документов, обработанных за секунду. Цель: > 12 док/сек (с запасом на пики).

Метрики для мониторинга

МетрикаОписаниеИнструмент
kafka_consumer_lagОтставание consumer'а от последнего сообщенияPrometheus + Grafana
processing_timeВремя обработки одного документаAirflow logs
error_rateДоля сообщений, ушедших в DLQKafka metrics
throughputДокументов/секCustom counter

Пример алерта если kafka_consumer_lag > 10000 сообщений в течение 5 минут → PagerDuty.


8. Масштабирование и оптимизация

Горизонтальное масштабирование

  • Увеличить количество партиций Kafka (например, с 3 до 10).
  • Запустить несколько экземпляров consumer'ов в одной consumer group (Kafka автоматически распределяет партиции).
  • Добавить реплики векторной БД.

Оптимизация чанкинга

  • Использовать semantic chunking (разбиение по смысловым границам) вместо фиксированной длины — улучшает качество retrieval.
  • Кэшировать результаты парсинга для повторяющихся документов.

Оптимизация эмбеддинга

  • Использовать GPU для batch inference.
  • Выбрать модель с подходящим trade-off скорость/качество (например, intfloat/e5-small-v2 быстрее, чем text-embedding-3-large).

9. Выбор инструментов: сравнение

КомпонентВариантыПлюсыМинусы
Брокер сообщенийKafka, RabbitMQ, AWS SQSKafka: высокая пропускная способность, replayKafka: сложнее в настройке
Векторная БДPinecone, Weaviate, Qdrant, MilvusPinecone: managed, простота; Milvus: open-source, гибкостьPinecone: дорого; Milvus: требует администрирования
ОркестрацияAirflow, Prefect, DagsterAirflow: зрелость, большое сообществоAirflow: тяжеловесен для простых задач
Эмбеддинг модельOpenAI, Cohere, Sentence-TransformersOpenAI: качество; Sentence-Transformers: бесплатноOpenAI: плата за API; локальные модели: ресурсы

Рекомендация для 1M док/день Kafka + Qdrant (self-hosted) + Airflow + Sentence-Transformers на GPU.


10. Пример кода: полный consumer для эмбеддинга

import json
import time
from kafka import KafkaConsumer, KafkaProducer
from sentence_transformers import SentenceTransformer
import qdrant_client

# Инициализация
model = SentenceTransformer('all-MiniLM-L6-v2')
client = qdrant_client.QdrantClient(host='localhost', port=6333)
consumer = KafkaConsumer('docs.chunks', bootstrap_servers=['localhost:9092'],
                         value_deserializer=lambda m: json.loads(m.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

BATCH_SIZE = 150
batch = []

for msg in consumer:
    chunk = msg.value
    batch.append(chunk)
    if len(batch) >= BATCH_SIZE:
        texts = [c['text'] for c in batch]
        try:
            embeddings = model.encode(texts, show_progress_bar=False)
            points = [
                qdrant_client.models.PointStruct(
                    id=hash(c['chunk_id']),
                    vector=emb.tolist(),
                    payload={'source_id': c['source_id'], 'text': c['text']}
                )
                for c, emb in zip(batch, embeddings)
            ]
            client.upsert(collection_name='docs', points=points)
        except Exception as e:
            # Отправка в DLQ
            for c in batch:
                producer.send('docs.dlq', {'error': str(e), 'chunk': c})
        finally:
            batch = []

Пет-проект для закрепления

Задача Разработать ETL пайплайн для индексации 10 000 новостных статей (RSS-ленты) в RAG-систему.

Инструменты Docker, Kafka (через wurstmeister/kafka), Python, Sentence-Transformers, Qdrant (in-memory для теста), Airflow (опционально).

Шаги:

  1. Настройте Kafka локально (1 топик news.raw, 3 партиции).
  2. Напишите producer, который читает RSS и публикует статьи в news.raw.
  3. Consumer 1: парсинг HTML (BeautifulSoup) → чанкинг (RecursiveCharacterTextSplitter из LangChain) → публикация в news.chunks.
  4. Consumer 2: батчинг по 50 → эмбеддинг → загрузка в Qdrant.
  5. Добавьте DLQ: при ошибке парсинга отправляйте в news.dlq.
  6. Напишите скрипт мониторинга: выводите consumer lag и throughput.

Ожидаемый результат Работающий пайплайн, который индексирует статьи с latency < 10 секунд. Вы сможете задавать вопросы к новостям через простой RAG-интерфейс (например, Streamlit + LangChain).


Связь с другими вопросами

ВопросТема
1Проектирование RAG-системы для 10 000 документов
2Проблема lost in the middle
3Стратегии chunking
4Обновление документов в RAG
5Оценка качества retrieval
6Hybrid search
7Уменьшение latency RAG
8Обработка запросов без ответа
9Обновление документов (повтор)
10Self-RAG

Навигация