Как бы вы спроектировали систему для реального времени (real-time) обработки документов?
Краткий тезис
Проектирование real-time обработки документов — это задача инфраструктурного уровня, а не ML-моделирования. Ключевая идея — разделить online (поиск/retrieval) и offline (индексация/ingestion) контуры. Real-time достигается через событийную архитектуру (Kafka, RabbitMQ), асинхронную обработку (asyncio) и батчинг эмбеддингов. Важно понимать, что ingestion (загрузка и индексация) может быть eventually consistent (в конечном счёте согласована) — документ становится доступным для поиска через несколько секунд после загрузки, а не мгновенно.
1. Термин: Real-time обработка документов
Что это: Система, которая принимает новый документ (PDF, Word, текст), парсит его, разбивает на чанки, генерирует эмбеддинги и добавляет в векторную базу данных (БД) для поиска — всё это с минимальной задержкой (обычно < 5-10 секунд).
Ключевое различие
- Real-time retrieval (поиск в реальном времени) — пользователь отправляет запрос и мгновенно получает ответ (latency < 1 сек). Это стандарт для RAG.
- Real-time ingestion (загрузка в реальном времени) — новый документ становится доступным для поиска почти сразу после загрузки. Это сложнее, чем retrieval, так как требует обработки (парсинг, чанкинг, эмбеддинги).
Термин Event-driven architecture (событийная архитектура): Система, где компоненты общаются через события (сообщения). Когда происходит действие (загружен документ), генерируется событие, которое обрабатывается асинхронно.
2. Архитектура: Событийная шина + Микросервисы
Проектирование начинается с разделения на микросервисы, соединённые через message broker (брокер сообщений).
Компоненты
- API Gateway / Upload Service — принимает файл от пользователя, сохраняет его в blob storage (S3, MinIO), публикует событие
document.uploadedв Kafka. - Parser Service — подписан на
document.uploaded, извлекает текст из PDF/Word/HTML. - Chunker Service — разбивает текст на чанки (фиксированный размер, chunking|semantic chunking).
- Embedding Service — генерирует эмбеддинги для каждого чанка (batch processing).
- Indexer Service — вставляет эмбеддинги в векторную БД (Pinecone, Qdrant, Weaviate).
Поток данных
Пользователь → Upload Service → Kafka (topic: document.uploaded)
→ Parser Service → Kafka (topic: document.parsed)
→ Chunker Service → Kafka (topic: chunks.ready)
→ Embedding Service → Kafka (topic: embeddings.ready)
→ Indexer Service → Vector DB
Термин Kafka (Apache Kafka): Распределённая платформа для потоковой обработки данных. Обеспечивает надёжную доставку сообщений, масштабирование и fault tolerance.
3. Online vs Offline: Разделение контуров
Критическое решение — разделить online retrieval (поиск) и offline ingestion (индексацию).
| Характеристика | Online (Retrieval) | Offline (Ingestion) |
|---|---|---|
| Latency | < 1 секунда | 1-10 секунд (допустимо) |
| Нагрузка | Высокая (много запросов) | Низкая (редко, но большие файлы) |
| Требования к consistency | Strong (данные должны быть актуальны) | Eventually consistent (допустима задержка) |
| Инструменты | Векторная БД, кэш (Redis) | Kafka, batch processing |
Почему это важно Если ingestion будет блокировать retrieval (например, при вставке большого документа), пользователи увидят задержки в поиске. Поэтому ingestion работает асинхронно, а retrieval использует уже проиндексированные данные.
Термин Eventually consistent (согласованность в конечном счёте): Система гарантирует, что через некоторое время после записи все читатели увидят актуальные данные. Нет гарантии мгновенной консистентности.
4. Обработка: Asyncio + Батчинг эмбеддингов
Для real-time обработки критична производительность. Два ключевых приёма:
4.1 Asyncio (асинхронное программирование)
Позволяет обрабатывать несколько задач конкурентно без создания потоков. Например, Parser Service может парсить 10 документов одновременно, не блокируя I/O (чтение с диска, запись в Kafka).
import asyncio
from kafka import KafkaProducer
async def process_document(file_path: str):
# Асинхронное чтение файла
text = await async_read_file(file_path)
# Асинхронная отправка в Kafka
await async_send_to_kafka('document.parsed', text)
return text
async def main():
tasks = [process_document(f'doc_{i}.pdf') for i in range(10)]
await asyncio.gather(*tasks)
4.2 Батчинг эмбеддингов
Генерация эмбеддингов через LLM (например, OpenAI text-embedding-3-small) — дорогая операция. Вместо того чтобы отправлять каждый чанк по одному, собираем их в батч (batch).
def embed_chunks(chunks: list[str], batch_size: int = 100) -> list[list[float]]:
embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
# Вызов API с батчем
response = openai.Embedding.create(input=batch, model="text-embedding-3-small")
embeddings.extend([d['embedding'] for d in response['data']])
return embeddings
Термин Batch processing (пакетная обработка): Обработка данных группами (батчами) для повышения пропускной способности (throughput) и снижения затрат на API-вызовы.
5. Consistency: Eventually Consistent для индексации
Real-time ingestion не означает strong consistency (строгую согласованность). Документ не обязан быть доступен для поиска сразу после загрузки. Допустима задержка в 2-5 секунд.
Почему это нормально
- Пользователь, загрузивший документ, может подождать несколько секунд, пока он проиндексируется.
- Для других пользователей документ становится доступен только после индексации.
Как обеспечить eventually consistent
- Использовать idempotent operations (идемпотентные операции) — повторная вставка того же чанка не создаёт дубликат.
- Векторная БД должна поддерживать upsert (update + insert) — если чанк уже существует, обновить его эмбеддинг.
- Мониторить lag (отставание) в Kafka — если consumer отстаёт, увеличивать количество партиций или workers.
Термин Idempotent (идемпотентный): Операция, которую можно выполнить несколько раз без изменения результата. Например, INSERT OR REPLACE в SQL.
6. Метрики: Время от загрузки до доступности
Для оценки real-time системы нужны метрики:
| Метрика | Описание | Целевое значение |
|---|---|---|
| Ingestion latency | Время от загрузки файла до появления в векторной БД | < 5 секунд для 10 MB файла |
| End-to-end latency | Время от загрузки до первого успешного поиска этого документа | < 10 секунд |
| Throughput | Количество документов, обработанных за минуту | Зависит от нагрузки |
| Error rate | Доля документов, не прошедших обработку | < 1% |
| Kafka lag | Количество необработанных сообщений в топике | < 100 |
Как измерять
- Добавить timestamps (метки времени) на каждом этапе (upload → parsed → chunked → embedded → indexed).
- Логировать в OpenTelemetry или Prometheus.
- Строить p99 latency (99-й перцентиль задержки) — время, которое не превышает 99% запросов.
Термин p99 latency (99-й перцентиль задержки): Значение задержки, ниже которого находится 99% всех запросов. Например, p99 = 3 секунды означает, что 99% документов обрабатываются быстрее 3 секунд.
7. Инструменты и технологии
| Компонент | Инструменты | Альтернативы |
|---|---|---|
| Message broker | Apache Kafka | RabbitMQ, AWS SQS, Google Pub/Sub |
| Blob storage | AWS S3, MinIO | Google Cloud Storage, Azure Blob |
| Parser | PyMuPDF (fitz), python-docx, BeautifulSoup | Unstructured.io, LangChain document loaders |
| Chunker | LangChain text splitters, custom | Semantic chunking (NLP-based) |
| Embedding model | OpenAI text-embedding-3-small, BGE | Sentence Transformers, Cohere |
| Vector DB | Pinecone, Qdrant, Weaviate | Milvus, Chroma, FAISS |
| Orchestration | Kubernetes, Docker Compose | AWS ECS, Google Cloud Run |
| Monitoring | Prometheus + Grafana | Datadog, New Relic |
8. Масштабирование и отказоустойчивость
8.1 Горизонтальное масштабирование
- Kafka partitions — увеличить количество партиций для параллельной обработки.
- Consumer groups — запустить несколько экземпляров Parser Service, каждый обрабатывает свою партицию.
- Vector DB — использовать шардирование (sharding) для распределения данных по узлам.
8.2 Отказоустойчивость (Fault Tolerance)
- Retry logic — если embedding API вернул ошибку, повторить через 1, 2, 4 секунды (exponential backoff).
- Dead letter queue (DLQ) — если документ не удалось обработать после N попыток, отправить в DLQ для ручного анализа.
- Checkpointing — Kafka consumer сохраняет offset (позицию) после обработки, чтобы при сбое продолжить с того же места.
Термин Dead letter queue (DLQ, очередь недоставленных сообщений): Отдельный топик в Kafka, куда попадают сообщения, которые не удалось обработать после всех попыток.
9. Пример реализации (упрощённый код)
# upload_service.py
from fastapi import FastAPI, UploadFile
from kafka import KafkaProducer
import uuid
app = FastAPI()
producer = KafkaProducer(bootstrap_servers='localhost:9092')
@app.post("/upload")
async def upload_document(file: UploadFile):
doc_id = str(uuid.uuid4())
# Сохраняем файл в S3
s3_path = f"documents/{doc_id}/{file.filename}"
await save_to_s3(file.file, s3_path)
# Публикуем событие
event = {"doc_id": doc_id, "s3_path": s3_path, "filename": file.filename}
producer.send('document.uploaded', value=event)
return {"doc_id": doc_id, "status": "processing"}
# parser_service.py
from kafka import KafkaConsumer, KafkaProducer
import asyncio
consumer = KafkaConsumer('document.uploaded', bootstrap_servers='localhost:9092')
producer = KafkaProducer(bootstrap_servers='localhost:9092')
async def parse_document(event):
s3_path = event['s3_path']
text = await extract_text_from_s3(s3_path)
# Публикуем событие с текстом
event['text'] = text
producer.send('document.parsed', value=event)
async def main():
for msg in consumer:
event = msg.value
await parse_document(event)
asyncio.run(main())
Пет-проект для закрепления
Задача Спроектировать и реализовать real-time систему для обработки PDF-документов (до 100 страниц) с задержкой ingestion < 10 секунд.
Инструменты
- Python (FastAPI, asyncio)
- Kafka (через Docker Compose)
- MinIO (S3-совместимое хранилище)
- PyMuPDF (парсинг PDF)
- OpenAI API (эмбеддинги)
- ChromaDB (векторная БД, локально)
- Prometheus + Grafana (мониторинг)
Шаги:
- Разверни Kafka и MinIO через Docker Compose.
- Напиши Upload Service (FastAPI) — принимает PDF, сохраняет в MinIO, публикует событие в Kafka.
- Напиши Parser Service — читает событие, парсит PDF, публикует текст.
- Напиши Chunker Service — разбивает текст на чанки по 500 токенов.
- Напиши Embedding Service — батчит чанки по 50, генерирует эмбеддинги.
- Напиши Indexer Service — вставляет эмбеддинги в ChromaDB.
- Добавь метрики: ingestion latency, Kafka lag, error rate.
- Протестируй: загрузи 10 PDF-файлов одновременно, измерь p99 latency.
Ожидаемый результат Система, которая обрабатывает 10 PDF-файлов (по 50 страниц) за < 15 секунд, с мониторингом в Grafana.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | Проектирование RAG-системы |
| 7 | Оптимизация задержки |
| 9 | Обновление документов |
| 15 | Метрики RAG |
| 20 | Streaming-обработка |
Навигация
- Предыдущий: 81
- Следующий: 83
- Индекс: 00. Индекс разборов