English translation is not available yet. Showing Russian content.

Как вы загружаете 1000 документов в RAG максимально эффективно?

Краткий тезис

Загрузка 1000 документов в RAG — это ETL-пайплайн (Extract, Transform, Load), а не просто цикл по файлам. Максимальная эффективность достигается за счёт асинхронности, батчинга, параллелизации и очередей. Ключевые принципы: не ждать каждый документ по отдельности, обрабатывать пачками, разделять этапы через очереди, логировать ошибки и иметь возможность перезапустить упавшие чанки без перезагрузки всех 1000 документов.

Цифра Наивный цикл (последовательно: парсинг → эмбеддинг → вставка) загружает 1000 документов за 30-60 минут. Оптимизированный пайплайн — за 2-5 минут.


1. Термин: ETL для RAG (Extract, Transform, Load)

Что это Процесс извлечения документов из источника (Extract), преобразования в чанки и эмбеддинги (Transform), загрузки в векторную БД (Load).

Этапы ETL для RAG

ЭтапЧто делаетсяЧто может быть узким местом
ExtractЧтение документов (PDF, Word, S3, база данных)Сетевые вызовы, I/O диск
Transform (парсинг + chunkingИзвлечение текста, разбиение на чанкиCPU (парсинг PDF)
Transform (embeddingПревращение чанков в векторыGPU (если локально) или сеть (API)
LoadВставка векторов в векторную БДСеть, БД (индексация)

Термин «Узкое место» (Bottleneck Этап, который ограничивает общую скорость. Идеальный пайплайн не имеет узких мест.


2. Наивный подход (что НЕ надо делать)

# ❌ ПЛОХО: синхронный цикл, всё последовательно
documents = ["doc1.pdf", "doc2.pdf", ..., "doc1000.pdf"]

for doc in documents:
    # 1. Парсинг (0.5 сек)
    text = parse_document(doc)
    
    # 2. Chunking (0.1 сек)
    chunks = split_text(text)
    
    for chunk in chunks:
        # 3. Embedding (0.2 сек на чанк) - API вызов!
        embedding = embedding_model.encode(chunk)
        
        # 4. Вставка в БД (0.05 сек)
        vector_db.insert(embedding, chunk)

Проблемы

  • Всё синхронно (ждём каждый этап)
  • Embedding по одному чанку (1000 документов × 10 чанков = 10 000 API вызовов!)
  • БД вставляет по одному вектору
  • При ошибке на 500-м документе — всё останавливается

Цифра 10 000 чанков × 0.2 сек = 2000 секунд ≈ 33 минуты только на эмбеддинги.


3. Оптимизация 1: Асинхронная загрузка (asyncio + aiohttp)

Что это Параллельное выполнение I/O-операций (сетевые вызовы, чтение файлов), пока CPU работает над другими задачами.

Термин «I/O-bound vs CPU-bound»

  • I/O-bound Операция ждёт сеть или диск (API вызовы]], чтение файлов). Можно параллелить.
  • CPU-bound Операция загружает процессор (парсинг PDF, эмбеддинги локально). Параллелить сложнее (GIL в Python).

Асинхронная загрузка документов

import asyncio
import aiohttp
from aiofile import async_open

async def load_document(session, doc_url):
    async with session.get(doc_url) as response:
        content = await response.read()
        return parse_document(content)  # CPU-bound

async def load_all_documents(doc_urls):
    async with aiohttp.ClientSession() as session:
        tasks = [load_document(session, url) for url in doc_urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # results — список текстов документов

Плюсы Сокращает время ожидания сети в N раз (где N — параллельных запросов). Цифра Синхронная загрузка 1000 документов по 0.5 сек = 500 сек. Асинхронная (50 параллельных) = 10 сек.


4. Оптимизация 2: Батчинг эмбеддингов

Что это Отправлять не один чанк в embedding-модель, а пачку (batch) из 100-200 чанков за раз.

Термин «Batch» (пачка Группа данных, обрабатываемая вместе. Для нейросетей (включая embedding-модели) батчевая обработка эффективнее побаточной, потому что операции матричного умножения векторизуются.

До (pечной режим

# ❌ Плохо: по одному чанку
for chunk in all_chunks:
    embedding = model.encode(chunk)  # 10 000 вызовов

После (батчинг

# ✅ Хорошо: пачками по 100
batch_size = 100
for i in range(0, len(all_chunks), batch_size):
    batch = all_chunks[i:i+batch_size]
    embeddings = model.encode(batch)  # 1 вызов на 100 чанков

Цифра

  • По одному: 10 000 вызовов × 0.2 сек = 2000 сек (33 мин)
  • Батчами по 100: 100 вызовов × 0.3 сек = 30 сек
  • Ускорение в 66 раз

Почему батч не 1000

  • Ограничение памяти GPU/модели (у некоторых embedding-моделей максимум 512 токенов на чанк × 1000 = 512k токенов — много)
  • API-провайдеры лимитируют размер батча (OpenAI: максимум 2048 векторов)
  • Риск отказа при большом батче (упал один — упали все)

5. Оптимизация 3: Параллельная вставка (bulk insert)

Что это Вставка нескольких векторов в векторную БД одной операцией, а не по одному.

Термин «Bulk insert» Массовая вставка. В обычных БД (PostgreSQL) это VALUES (1),(2),(3)... В векторных БД аналогично.

До (побаточная вставка

# ❌ Плохо: по одному вектору
for embedding, chunk in zip(embeddings, chunks):
    vector_db.insert(embedding, payload={"text": chunk})

После (bulk insert

# ✅ Хорошо: пачками
vector_db.insert_batch(
    vectors=embeddings,
    payloads=[{"text": chunk} for chunk in chunks]
)

Цифра

  • По одному: 10 000 операций × 0.05 сек = 500 сек (8 мин)
  • Bulk (100 векторов): 100 операций × 0.1 сек = 10 сек
  • Ускорение в 50 раз

6. Оптимизация 4: Пайплайн с очередями (RabbitMQ/Kafka)

Что это Разделение ETL на независимые стадии, соединённые очередями. Каждая стадия работает параллельно и в своём темпе.

Термин «Consumer-producer» Производитель создаёт задачи, потребитель их обрабатывает. Очередь их развязывает.

Архитектура

[Источник] → [Queue 1] → [Парсинг] → [Queue 2] → [Chunking] → [Queue 3] → [Embedding] → [Queue 4] → [Insert DB]
                 ↑              ↑              ↑               ↑
            параллельно    параллельно    параллельно     параллельно

Реализация с Celery (очередь задач

from celery import Celery

app = Celery('rag_etl', broker='redis://localhost:6379')

@app.task
def parse_document(doc_url):
    text = extract_text(doc_url)
    return text

@app.task
def chunk_document(text):
    chunks = split_text(text)
    return chunks

@app.task
def embed_chunk(chunk):
    embedding = embedding_model.encode(chunk)
    return embedding

@app.task
def insert_chunk(embedding, chunk):
    vector_db.insert(embedding, payload={"text": chunk})

# Пайплайн
def etl_pipeline(doc_url):
    text = parse_document.delay(doc_url).get()
    chunks = chunk_document.delay(text).get()
    for chunk in chunks:
        embedding = embed_chunk.delay(chunk).get()
        insert_chunk.delay(embedding, chunk)

Плюсы

  • Каждая стадия масштабируется независимо
  • При падении одной стадии, задачи остаются в очереди (не теряются)
  • Легко добавить retry, dead letter queue

Минусы Сложнее в настройке.


7. Полный оптимизированный пайплайн (код)

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from sentence_transformers import SentenceTransformer

# Настройки
BATCH_SIZE_EMBED = 100   # embedding batch
BATCH_SIZE_DB = 500       # database bulk insert
MAX_CONCURRENT_DOCS = 50  # параллельных загрузок документов
CHUNK_SIZE = 1000         # размер чанка в токенах

# Инициализация (один раз)
embedder = SentenceTransformer("BAAI/bge-m3")
executor = ThreadPoolExecutor(max_workers=4)  # для CPU-парсинга

async def process_document(session, doc_url, doc_id):
    """Обработка одного документа от начала до конца"""
    try:
        # 1. Extract (асинхронно)
        async with session.get(doc_url) as response:
            content = await response.read()
        
        # 2. Parse (CPU-bound, в отдельном потоке)
        text = await loop.run_in_executor(executor, parse_pdf, content)
        
        # 3. Chunking (CPU-bound)
        chunks = await loop.run_in_executor(executor, split_text, text)
        
        # Возвращаем чанки для дальнейшего батчевого embedding
        return [(doc_id, i, chunk) for i, chunk in enumerate(chunks)]
    
    except Exception as e:
        log_error(doc_url, e)
        return []

async def batch_embed_and_insert(all_chunks_with_ids):
    """Батчевый embedding и bulk insert"""
    all_embeddings = []
    all_payloads = []
    
    # Проходим по чанкам пачками
    for i in range(0, len(all_chunks_with_ids), BATCH_SIZE_EMBED):
        batch = all_chunks_with_ids[i:i+BATCH_SIZE_EMBED]
        batch_texts = [chunk[2] for chunk in batch]
        
        # Embedding пачкой
        embeddings = embedder.encode(batch_texts, batch_size=BATCH_SIZE_EMBED)
        
        # Накопляем для bulk insert
        for (doc_id, chunk_idx, text), emb in zip(batch, embeddings):
            all_embeddings.append(emb)
            all_payloads.append({
                "doc_id": doc_id,
                "chunk_idx": chunk_idx,
                "text": text
            })
        
        # Вставляем в БД пачками (каждые BATCH_SIZE_DB)
        if len(all_embeddings) >= BATCH_SIZE_DB:
            vector_db.insert_batch(all_embeddings[:BATCH_SIZE_DB], all_payloads[:BATCH_SIZE_DB])
            all_embeddings = all_embeddings[BATCH_SIZE_DB:]
            all_payloads = all_payloads[BATCH_SIZE_DB:]
    
    # Оставшиеся
    if all_embeddings:
        vector_db.insert_batch(all_embeddings, all_payloads)

async def main(doc_urls):
    all_chunks = []
    
    # Step 1: Асинхронная загрузка и парсинг всех документов
    async with aiohttp.ClientSession() as session:
        tasks = [process_document(session, url, i) for i, url in enumerate(doc_urls)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for res in results:
            if isinstance(res, list):
                all_chunks.extend(res)
            else:
                log_error("Task failed", res)
    
    # Step 2: Батчевый embedding + bulk insert
    await batch_embed_and_insert(all_chunks)
    
    print(f"Обработано {len(doc_urls)} документов, {len(all_chunks)} чанков")

# Запуск
asyncio.run(main(doc_urls))

8. Мониторинг и обработка ошибок

Что нужно логировать

СобытиеЧто логируемЗачем
Начало обработки документаdoc_id, timestampОценка прогресса
Успешная обработкаdoc_id, num_chunks, durationСтатистика
Ошибка парсингаdoc_id, error_type, stacktraceОтладка
Ошибка embeddingchunk_id, errorПовторная обработка
Ошибка вставки в БДchunk_id, errorПовтор

Dead Letter Queue (DLQ

# При ошибке, кладём задачу в DLQ вместо потери
@app.task(bind=True, max_retries=3)
def process_document_with_retry(self, doc_url):
    try:
        return parse_document(doc_url)
    except Exception as e:
        if self.request.retries < 3:
            raise self.retry(exc=e, countdown=60)  # повторить через 60 сек
        else:
            # Всё, 3 попытки не помогли — в DLQ
            dead_letter_queue.send(doc_url, str(e))

Термин «Dead Letter Queue (DLQ)» Очередь для сообщений, которые не удалось обработать после нескольких попыток. Потом их можно проанализировать вручную.


9. Сравнение подходов (цифры для 1000 документов)

ПодходВремяПлюсыМинусы
Наивный (синхронный, всё последовательно30-60 минПростой кодОчень медленно
Асинхронная загрузка20-30 минУже лучшеEmbedding всё ещё медленный
+ Батчинг эмбеддингов5-10 минУскорение 3-6x
+ Bulk insert в БД3-5 минЕщё 2x ускорение
+ Очереди (Kafka/Celery2-3 минМасштабируется до миллионовСложная инфраструктура

Золотая середина для 1000 документов Асинхронность + батчинг + bulk insert без очередей (хватит).


10. Пет-проект для закрепления

Задача Сравнить производительность разных подходов загрузки 1000 документов.

Инструменты Python, Qdrant, sentence-transformers (BGE-m3), aiohttp

Шаги

  1. Сгенерировать 1000 тестовых документов (или взять реальные)
  2. Реализовать 4 подхода:
    • Baseline синхронный цикл, чанки по одному, вставка по одному
    • Асинхронный asyncio для загрузки документов, но embedding всё ещё по одному
      • Батчинг embedding пачками по 100
  3. Замерить время каждого подхода
  4. Посчитать ускорение (speedup)
  5. Построить график: время vs количество документов (100, 500, 1000)
  6. Выявить узкое место в каждом подходе

Ожидаемый результат Ускорение в 10-30 раз от наивного подхода к оптимизированному.


Связь с другими вопросами

ВопросТема
1RAG архитектура (этап индексации)
3Chunking (подготовка чанков)
4Векторная БД (bulk insert в Qdrant)
7Оптимизация latency (асинхронность, батчинг)
9Обновление документов (incremental update)
256-275Data engineering for AI (ETL пайплайны)

13. Как вы загружаете 1000 документов в RAG максимально эффективно|13. Как вы загружаете 1000 документов в RAG максимально эффективно|13. Как вы загружаете 1000 документов в RAG максимально эффективно|13 полностью разобран. Переходим к вопросу 14, когда будете готовы|Вопрос 13 полностью разобран. Переходим к вопросу 14, когда будете готовы]]


Навигация