Как бы вы спроектировали систему для реального времени (real-time) обработки документов?

Краткий тезис

Проектирование real-time обработки документов — это задача инфраструктурного уровня, а не ML-моделирования. Ключевая идея — разделить online (поиск/retrieval) и offline (индексация/ingestion) контуры. Real-time достигается через событийную архитектуру (Kafka, RabbitMQ), асинхронную обработку (asyncio) и батчинг эмбеддингов. Важно понимать, что ingestion (загрузка и индексация) может быть eventually consistent (в конечном счёте согласована) — документ становится доступным для поиска через несколько секунд после загрузки, а не мгновенно.


1. Термин: Real-time обработка документов

Что это: Система, которая принимает новый документ (PDF, Word, текст), парсит его, разбивает на чанки, генерирует эмбеддинги и добавляет в векторную базу данных (БД) для поиска — всё это с минимальной задержкой (обычно < 5-10 секунд).

Ключевое различие

  • Real-time retrieval (поиск в реальном времени) — пользователь отправляет запрос и мгновенно получает ответ (latency < 1 сек). Это стандарт для RAG.
  • Real-time ingestion (загрузка в реальном времени) — новый документ становится доступным для поиска почти сразу после загрузки. Это сложнее, чем retrieval, так как требует обработки (парсинг, чанкинг, эмбеддинги).

Термин Event-driven architecture (событийная архитектура): Система, где компоненты общаются через события (сообщения). Когда происходит действие (загружен документ), генерируется событие, которое обрабатывается асинхронно.


2. Архитектура: Событийная шина + Микросервисы

Проектирование начинается с разделения на микросервисы, соединённые через message broker (брокер сообщений).

Компоненты

  1. API Gateway / Upload Service — принимает файл от пользователя, сохраняет его в blob storage (S3, MinIO), публикует событие document.uploaded в Kafka.
  2. Parser Service — подписан на document.uploaded, извлекает текст из PDF/Word/HTML.
  3. Chunker Service — разбивает текст на чанки (фиксированный размер, chunking|semantic chunking).
  4. Embedding Service — генерирует эмбеддинги для каждого чанка (batch processing).
  5. Indexer Service — вставляет эмбеддинги в векторную БД (Pinecone, Qdrant, Weaviate).

Поток данных

Пользователь → Upload Service → Kafka (topic: document.uploaded)
    → Parser Service → Kafka (topic: document.parsed)
        → Chunker Service → Kafka (topic: chunks.ready)
            → Embedding Service → Kafka (topic: embeddings.ready)
                → Indexer Service → Vector DB

Термин Kafka (Apache Kafka): Распределённая платформа для потоковой обработки данных. Обеспечивает надёжную доставку сообщений, масштабирование и fault tolerance.


3. Online vs Offline: Разделение контуров

Критическое решение — разделить online retrieval (поиск) и offline ingestion (индексацию).

ХарактеристикаOnline (Retrieval)Offline (Ingestion)
Latency< 1 секунда1-10 секунд (допустимо)
НагрузкаВысокая (много запросов)Низкая (редко, но большие файлы)
Требования к consistencyStrong (данные должны быть актуальны)Eventually consistent (допустима задержка)
ИнструментыВекторная БД, кэш (Redis)Kafka, batch processing

Почему это важно Если ingestion будет блокировать retrieval (например, при вставке большого документа), пользователи увидят задержки в поиске. Поэтому ingestion работает асинхронно, а retrieval использует уже проиндексированные данные.

Термин Eventually consistent (согласованность в конечном счёте): Система гарантирует, что через некоторое время после записи все читатели увидят актуальные данные. Нет гарантии мгновенной консистентности.


4. Обработка: Asyncio + Батчинг эмбеддингов

Для real-time обработки критична производительность. Два ключевых приёма:

4.1 Asyncio (асинхронное программирование)

Позволяет обрабатывать несколько задач конкурентно без создания потоков. Например, Parser Service может парсить 10 документов одновременно, не блокируя I/O (чтение с диска, запись в Kafka).

import asyncio
from kafka import KafkaProducer

async def process_document(file_path: str):
    # Асинхронное чтение файла
    text = await async_read_file(file_path)
    # Асинхронная отправка в Kafka
    await async_send_to_kafka('document.parsed', text)
    return text

async def main():
    tasks = [process_document(f'doc_{i}.pdf') for i in range(10)]
    await asyncio.gather(*tasks)

4.2 Батчинг эмбеддингов

Генерация эмбеддингов через LLM (например, OpenAI text-embedding-3-small) — дорогая операция. Вместо того чтобы отправлять каждый чанк по одному, собираем их в батч (batch).

def embed_chunks(chunks: list[str], batch_size: int = 100) -> list[list[float]]:
    embeddings = []
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        # Вызов API с батчем
        response = openai.Embedding.create(input=batch, model="text-embedding-3-small")
        embeddings.extend([d['embedding'] for d in response['data']])
    return embeddings

Термин Batch processing (пакетная обработка): Обработка данных группами (батчами) для повышения пропускной способности (throughput) и снижения затрат на API-вызовы.


5. Consistency: Eventually Consistent для индексации

Real-time ingestion не означает strong consistency (строгую согласованность). Документ не обязан быть доступен для поиска сразу после загрузки. Допустима задержка в 2-5 секунд.

Почему это нормально

  • Пользователь, загрузивший документ, может подождать несколько секунд, пока он проиндексируется.
  • Для других пользователей документ становится доступен только после индексации.

Как обеспечить eventually consistent

  • Использовать idempotent operations (идемпотентные операции) — повторная вставка того же чанка не создаёт дубликат.
  • Векторная БД должна поддерживать upsert (update + insert) — если чанк уже существует, обновить его эмбеддинг.
  • Мониторить lag (отставание) в Kafka — если consumer отстаёт, увеличивать количество партиций или workers.

Термин Idempotent (идемпотентный): Операция, которую можно выполнить несколько раз без изменения результата. Например, INSERT OR REPLACE в SQL.


6. Метрики: Время от загрузки до доступности

Для оценки real-time системы нужны метрики:

МетрикаОписаниеЦелевое значение
Ingestion latencyВремя от загрузки файла до появления в векторной БД< 5 секунд для 10 MB файла
End-to-end latencyВремя от загрузки до первого успешного поиска этого документа< 10 секунд
ThroughputКоличество документов, обработанных за минутуЗависит от нагрузки
Error rateДоля документов, не прошедших обработку< 1%
Kafka lagКоличество необработанных сообщений в топике< 100

Как измерять

  • Добавить timestamps (метки времени) на каждом этапе (upload → parsed → chunked → embedded → indexed).
  • Логировать в OpenTelemetry или Prometheus.
  • Строить p99 latency (99-й перцентиль задержки) — время, которое не превышает 99% запросов.

Термин p99 latency (99-й перцентиль задержки): Значение задержки, ниже которого находится 99% всех запросов. Например, p99 = 3 секунды означает, что 99% документов обрабатываются быстрее 3 секунд.


7. Инструменты и технологии

КомпонентИнструментыАльтернативы
Message brokerApache KafkaRabbitMQ, AWS SQS, Google Pub/Sub
Blob storageAWS S3, MinIOGoogle Cloud Storage, Azure Blob
ParserPyMuPDF (fitz), python-docx, BeautifulSoupUnstructured.io, LangChain document loaders
ChunkerLangChain text splitters, customSemantic chunking (NLP-based)
Embedding modelOpenAI text-embedding-3-small, BGESentence Transformers, Cohere
Vector DBPinecone, Qdrant, WeaviateMilvus, Chroma, FAISS
OrchestrationKubernetes, Docker ComposeAWS ECS, Google Cloud Run
MonitoringPrometheus + GrafanaDatadog, New Relic

8. Масштабирование и отказоустойчивость

8.1 Горизонтальное масштабирование

  • Kafka partitions — увеличить количество партиций для параллельной обработки.
  • Consumer groups — запустить несколько экземпляров Parser Service, каждый обрабатывает свою партицию.
  • Vector DB — использовать шардирование (sharding) для распределения данных по узлам.

8.2 Отказоустойчивость (Fault Tolerance)

  • Retry logic — если embedding API вернул ошибку, повторить через 1, 2, 4 секунды (exponential backoff).
  • Dead letter queue (DLQ) — если документ не удалось обработать после N попыток, отправить в DLQ для ручного анализа.
  • CheckpointingKafka consumer сохраняет offset (позицию) после обработки, чтобы при сбое продолжить с того же места.

Термин Dead letter queue (DLQ, очередь недоставленных сообщений): Отдельный топик в Kafka, куда попадают сообщения, которые не удалось обработать после всех попыток.


9. Пример реализации (упрощённый код)

# upload_service.py
from fastapi import FastAPI, UploadFile
from kafka import KafkaProducer
import uuid

app = FastAPI()
producer = KafkaProducer(bootstrap_servers='localhost:9092')

@app.post("/upload")
async def upload_document(file: UploadFile):
    doc_id = str(uuid.uuid4())
    # Сохраняем файл в S3
    s3_path = f"documents/{doc_id}/{file.filename}"
    await save_to_s3(file.file, s3_path)
    # Публикуем событие
    event = {"doc_id": doc_id, "s3_path": s3_path, "filename": file.filename}
    producer.send('document.uploaded', value=event)
    return {"doc_id": doc_id, "status": "processing"}
# parser_service.py
from kafka import KafkaConsumer, KafkaProducer
import asyncio

consumer = KafkaConsumer('document.uploaded', bootstrap_servers='localhost:9092')
producer = KafkaProducer(bootstrap_servers='localhost:9092')

async def parse_document(event):
    s3_path = event['s3_path']
    text = await extract_text_from_s3(s3_path)
    # Публикуем событие с текстом
    event['text'] = text
    producer.send('document.parsed', value=event)

async def main():
    for msg in consumer:
        event = msg.value
        await parse_document(event)

asyncio.run(main())

Пет-проект для закрепления

Задача Спроектировать и реализовать real-time систему для обработки PDF-документов (до 100 страниц) с задержкой ingestion < 10 секунд.

Инструменты

Шаги:

  1. Разверни Kafka и MinIO через Docker Compose.
  2. Напиши Upload Service (FastAPI) — принимает PDF, сохраняет в MinIO, публикует событие в Kafka.
  3. Напиши Parser Service — читает событие, парсит PDF, публикует текст.
  4. Напиши Chunker Service — разбивает текст на чанки по 500 токенов.
  5. Напиши Embedding Service — батчит чанки по 50, генерирует эмбеддинги.
  6. Напиши Indexer Service — вставляет эмбеддинги в ChromaDB.
  7. Добавь метрики: ingestion latency, Kafka lag, error rate.
  8. Протестируй: загрузи 10 PDF-файлов одновременно, измерь p99 latency.

Ожидаемый результат Система, которая обрабатывает 10 PDF-файлов (по 50 страниц) за < 15 секунд, с мониторингом в Grafana.


Связь с другими вопросами

ВопросТема
1Проектирование RAG-системы
7Оптимизация задержки
9Обновление документов
15Метрики RAG
20Streaming-обработка

Навигация