Как вы загружаете 1000 документов в RAG максимально эффективно?
Краткий тезис
Загрузка 1000 документов в RAG — это ETL-пайплайн (Extract, Transform, Load), а не просто цикл по файлам. Максимальная эффективность достигается за счёт асинхронности, батчинга, параллелизации и очередей. Ключевые принципы: не ждать каждый документ по отдельности, обрабатывать пачками, разделять этапы через очереди, логировать ошибки и иметь возможность перезапустить упавшие чанки без перезагрузки всех 1000 документов.
Цифра Наивный цикл (последовательно: парсинг → эмбеддинг → вставка) загружает 1000 документов за 30-60 минут. Оптимизированный пайплайн — за 2-5 минут.
1. Термин: ETL для RAG (Extract, Transform, Load)
Что это Процесс извлечения документов из источника (Extract), преобразования в чанки и эмбеддинги (Transform), загрузки в векторную БД (Load).
Этапы ETL для RAG
| Этап | Что делается | Что может быть узким местом |
|---|---|---|
| Extract | Чтение документов (PDF, Word, S3, база данных) | Сетевые вызовы, I/O диск |
| Transform (парсинг + chunking | Извлечение текста, разбиение на чанки | CPU (парсинг PDF) |
| Transform (embedding | Превращение чанков в векторы | GPU (если локально) или сеть (API) |
| Load | Вставка векторов в векторную БД | Сеть, БД (индексация) |
Термин «Узкое место» (Bottleneck Этап, который ограничивает общую скорость. Идеальный пайплайн не имеет узких мест.
2. Наивный подход (что НЕ надо делать)
# ❌ ПЛОХО: синхронный цикл, всё последовательно
documents = ["doc1.pdf", "doc2.pdf", ..., "doc1000.pdf"]
for doc in documents:
# 1. Парсинг (0.5 сек)
text = parse_document(doc)
# 2. Chunking (0.1 сек)
chunks = split_text(text)
for chunk in chunks:
# 3. Embedding (0.2 сек на чанк) - API вызов!
embedding = embedding_model.encode(chunk)
# 4. Вставка в БД (0.05 сек)
vector_db.insert(embedding, chunk)
Проблемы
- Всё синхронно (ждём каждый этап)
- Embedding по одному чанку (1000 документов × 10 чанков = 10 000 API вызовов!)
- БД вставляет по одному вектору
- При ошибке на 500-м документе — всё останавливается
Цифра 10 000 чанков × 0.2 сек = 2000 секунд ≈ 33 минуты только на эмбеддинги.
3. Оптимизация 1: Асинхронная загрузка (asyncio + aiohttp)
Что это Параллельное выполнение I/O-операций (сетевые вызовы, чтение файлов), пока CPU работает над другими задачами.
Термин «I/O-bound vs CPU-bound»
- I/O-bound Операция ждёт сеть или диск (API вызовы]], чтение файлов). Можно параллелить.
- CPU-bound Операция загружает процессор (парсинг PDF, эмбеддинги локально). Параллелить сложнее (GIL в Python).
Асинхронная загрузка документов
import asyncio
import aiohttp
from aiofile import async_open
async def load_document(session, doc_url):
async with session.get(doc_url) as response:
content = await response.read()
return parse_document(content) # CPU-bound
async def load_all_documents(doc_urls):
async with aiohttp.ClientSession() as session:
tasks = [load_document(session, url) for url in doc_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# results — список текстов документов
Плюсы Сокращает время ожидания сети в N раз (где N — параллельных запросов). Цифра Синхронная загрузка 1000 документов по 0.5 сек = 500 сек. Асинхронная (50 параллельных) = 10 сек.
4. Оптимизация 2: Батчинг эмбеддингов
Что это Отправлять не один чанк в embedding-модель, а пачку (batch) из 100-200 чанков за раз.
Термин «Batch» (пачка Группа данных, обрабатываемая вместе. Для нейросетей (включая embedding-модели) батчевая обработка эффективнее побаточной, потому что операции матричного умножения векторизуются.
До (pечной режим
# ❌ Плохо: по одному чанку
for chunk in all_chunks:
embedding = model.encode(chunk) # 10 000 вызовов
После (батчинг
# ✅ Хорошо: пачками по 100
batch_size = 100
for i in range(0, len(all_chunks), batch_size):
batch = all_chunks[i:i+batch_size]
embeddings = model.encode(batch) # 1 вызов на 100 чанков
Цифра
- По одному: 10 000 вызовов × 0.2 сек = 2000 сек (33 мин)
- Батчами по 100: 100 вызовов × 0.3 сек = 30 сек
- Ускорение в 66 раз
Почему батч не 1000
- Ограничение памяти GPU/модели (у некоторых embedding-моделей максимум 512 токенов на чанк × 1000 = 512k токенов — много)
- API-провайдеры лимитируют размер батча (OpenAI: максимум 2048 векторов)
- Риск отказа при большом батче (упал один — упали все)
5. Оптимизация 3: Параллельная вставка (bulk insert)
Что это Вставка нескольких векторов в векторную БД одной операцией, а не по одному.
Термин «Bulk insert» Массовая вставка. В обычных БД (PostgreSQL) это VALUES (1),(2),(3)... В векторных БД аналогично.
До (побаточная вставка
# ❌ Плохо: по одному вектору
for embedding, chunk in zip(embeddings, chunks):
vector_db.insert(embedding, payload={"text": chunk})
После (bulk insert
# ✅ Хорошо: пачками
vector_db.insert_batch(
vectors=embeddings,
payloads=[{"text": chunk} for chunk in chunks]
)
Цифра
- По одному: 10 000 операций × 0.05 сек = 500 сек (8 мин)
- Bulk (100 векторов): 100 операций × 0.1 сек = 10 сек
- Ускорение в 50 раз
6. Оптимизация 4: Пайплайн с очередями (RabbitMQ/Kafka)
Что это Разделение ETL на независимые стадии, соединённые очередями. Каждая стадия работает параллельно и в своём темпе.
Термин «Consumer-producer» Производитель создаёт задачи, потребитель их обрабатывает. Очередь их развязывает.
Архитектура
[Источник] → [Queue 1] → [Парсинг] → [Queue 2] → [Chunking] → [Queue 3] → [Embedding] → [Queue 4] → [Insert DB]
↑ ↑ ↑ ↑
параллельно параллельно параллельно параллельно
Реализация с Celery (очередь задач
from celery import Celery
app = Celery('rag_etl', broker='redis://localhost:6379')
@app.task
def parse_document(doc_url):
text = extract_text(doc_url)
return text
@app.task
def chunk_document(text):
chunks = split_text(text)
return chunks
@app.task
def embed_chunk(chunk):
embedding = embedding_model.encode(chunk)
return embedding
@app.task
def insert_chunk(embedding, chunk):
vector_db.insert(embedding, payload={"text": chunk})
# Пайплайн
def etl_pipeline(doc_url):
text = parse_document.delay(doc_url).get()
chunks = chunk_document.delay(text).get()
for chunk in chunks:
embedding = embed_chunk.delay(chunk).get()
insert_chunk.delay(embedding, chunk)
Плюсы
- Каждая стадия масштабируется независимо
- При падении одной стадии, задачи остаются в очереди (не теряются)
- Легко добавить retry, dead letter queue
Минусы Сложнее в настройке.
7. Полный оптимизированный пайплайн (код)
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from sentence_transformers import SentenceTransformer
# Настройки
BATCH_SIZE_EMBED = 100 # embedding batch
BATCH_SIZE_DB = 500 # database bulk insert
MAX_CONCURRENT_DOCS = 50 # параллельных загрузок документов
CHUNK_SIZE = 1000 # размер чанка в токенах
# Инициализация (один раз)
embedder = SentenceTransformer("BAAI/bge-m3")
executor = ThreadPoolExecutor(max_workers=4) # для CPU-парсинга
async def process_document(session, doc_url, doc_id):
"""Обработка одного документа от начала до конца"""
try:
# 1. Extract (асинхронно)
async with session.get(doc_url) as response:
content = await response.read()
# 2. Parse (CPU-bound, в отдельном потоке)
text = await loop.run_in_executor(executor, parse_pdf, content)
# 3. Chunking (CPU-bound)
chunks = await loop.run_in_executor(executor, split_text, text)
# Возвращаем чанки для дальнейшего батчевого embedding
return [(doc_id, i, chunk) for i, chunk in enumerate(chunks)]
except Exception as e:
log_error(doc_url, e)
return []
async def batch_embed_and_insert(all_chunks_with_ids):
"""Батчевый embedding и bulk insert"""
all_embeddings = []
all_payloads = []
# Проходим по чанкам пачками
for i in range(0, len(all_chunks_with_ids), BATCH_SIZE_EMBED):
batch = all_chunks_with_ids[i:i+BATCH_SIZE_EMBED]
batch_texts = [chunk[2] for chunk in batch]
# Embedding пачкой
embeddings = embedder.encode(batch_texts, batch_size=BATCH_SIZE_EMBED)
# Накопляем для bulk insert
for (doc_id, chunk_idx, text), emb in zip(batch, embeddings):
all_embeddings.append(emb)
all_payloads.append({
"doc_id": doc_id,
"chunk_idx": chunk_idx,
"text": text
})
# Вставляем в БД пачками (каждые BATCH_SIZE_DB)
if len(all_embeddings) >= BATCH_SIZE_DB:
vector_db.insert_batch(all_embeddings[:BATCH_SIZE_DB], all_payloads[:BATCH_SIZE_DB])
all_embeddings = all_embeddings[BATCH_SIZE_DB:]
all_payloads = all_payloads[BATCH_SIZE_DB:]
# Оставшиеся
if all_embeddings:
vector_db.insert_batch(all_embeddings, all_payloads)
async def main(doc_urls):
all_chunks = []
# Step 1: Асинхронная загрузка и парсинг всех документов
async with aiohttp.ClientSession() as session:
tasks = [process_document(session, url, i) for i, url in enumerate(doc_urls)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, list):
all_chunks.extend(res)
else:
log_error("Task failed", res)
# Step 2: Батчевый embedding + bulk insert
await batch_embed_and_insert(all_chunks)
print(f"Обработано {len(doc_urls)} документов, {len(all_chunks)} чанков")
# Запуск
asyncio.run(main(doc_urls))
8. Мониторинг и обработка ошибок
Что нужно логировать
| Событие | Что логируем | Зачем |
|---|---|---|
| Начало обработки документа | doc_id, timestamp | Оценка прогресса |
| Успешная обработка | doc_id, num_chunks, duration | Статистика |
| Ошибка парсинга | doc_id, error_type, stacktrace | Отладка |
| Ошибка embedding | chunk_id, error | Повторная обработка |
| Ошибка вставки в БД | chunk_id, error | Повтор |
Dead Letter Queue (DLQ
# При ошибке, кладём задачу в DLQ вместо потери
@app.task(bind=True, max_retries=3)
def process_document_with_retry(self, doc_url):
try:
return parse_document(doc_url)
except Exception as e:
if self.request.retries < 3:
raise self.retry(exc=e, countdown=60) # повторить через 60 сек
else:
# Всё, 3 попытки не помогли — в DLQ
dead_letter_queue.send(doc_url, str(e))
Термин «Dead Letter Queue (DLQ)» Очередь для сообщений, которые не удалось обработать после нескольких попыток. Потом их можно проанализировать вручную.
9. Сравнение подходов (цифры для 1000 документов)
| Подход | Время | Плюсы | Минусы |
|---|---|---|---|
| Наивный (синхронный, всё последовательно | 30-60 мин | Простой код | Очень медленно |
| Асинхронная загрузка | 20-30 мин | Уже лучше | Embedding всё ещё медленный |
| + Батчинг эмбеддингов | 5-10 мин | Ускорение 3-6x | — |
| + Bulk insert в БД | 3-5 мин | Ещё 2x ускорение | — |
| + Очереди (Kafka/Celery | 2-3 мин | Масштабируется до миллионов | Сложная инфраструктура |
Золотая середина для 1000 документов Асинхронность + батчинг + bulk insert без очередей (хватит).
10. Пет-проект для закрепления
Задача Сравнить производительность разных подходов загрузки 1000 документов.
Инструменты Python, Qdrant, sentence-transformers (BGE-m3), aiohttp
Шаги
- Сгенерировать 1000 тестовых документов (или взять реальные)
- Реализовать 4 подхода:
- Baseline синхронный цикл, чанки по одному, вставка по одному
- Асинхронный asyncio для загрузки документов, но embedding всё ещё по одному
-
- Батчинг embedding пачками по 100
-
- Bulk insert вставка в БД пачками
- Замерить время каждого подхода
- Посчитать ускорение (speedup)
- Построить график: время vs количество документов (100, 500, 1000)
- Выявить узкое место в каждом подходе
Ожидаемый результат Ускорение в 10-30 раз от наивного подхода к оптимизированному.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | RAG архитектура (этап индексации) |
| 3 | Chunking (подготовка чанков) |
| 4 | Векторная БД (bulk insert в Qdrant) |
| 7 | Оптимизация latency (асинхронность, батчинг) |
| 9 | Обновление документов (incremental update) |
| 256-275 | Data engineering for AI (ETL пайплайны) |
13. Как вы загружаете 1000 документов в RAG максимально эффективно|13. Как вы загружаете 1000 документов в RAG максимально эффективно|13. Как вы загружаете 1000 документов в RAG максимально эффективно|13 полностью разобран. Переходим к вопросу 14, когда будете готовы|Вопрос 13 полностью разобран. Переходим к вопросу 14, когда будете готовы]]
Навигация
- Предыдущий: 12
- Следующий: 14
- Индекс: 00. Индекс разборов