Как строить streaming RAG pipeline (real-time ingestion)?

Краткий тезис

Streaming RAG pipeline (конвейер потоковой RAG) — это архитектура для непрерывной индексации документов в реальном времени без даунтайма. Основа — Change Data Capture (CDC) из источника (например, PostgreSQL), передача изменений через Apache Kafka, потоковая обработка для дедупликации и трансформации, и финальная запись в векторную базу данных (Qdrant). Ключевые вызовы: порядок событий (ordering), обработка опоздавших данных (late-arriving data) и гарантия exactly-once. Целевая задержка от изменения документа до индексации — менее 1 секунды.


1. Терминология: Streaming RAG pipeline и real-time ingestion

Streaming RAG pipeline — конвейер, который обрабатывает изменения в документах по мере их появления, без batch-загрузок. Real-time ingestion — процесс непрерывного приёма новых и обновлённых документов, их разбиения (chunking), генерации эмбеддингов и помещения в векторное хранилище.

Отличие от классической RAG: в batch-режиме индексация запускается по расписанию (раз в сутки), а streaming обеспечивает актуальность данных с задержкой в секунды.

Термин Change Data Capture (CDC) — техника захвата изменений в базе данных (INSERT, UPDATE, DELETE) на уровне транзакционного лога, чтобы транслировать их в стрим событий.


2. Компоненты streaming RAG pipeline

Архитектура состоит из 5 ключевых компонентов:

КомпонентПример технологииФункция
Источник CDCDebezium (Kafka Connect)Читает WAL PostgreSQL и публикует события в Kafka
Message brokerApache KafkaГарантирует упорядоченность и надёжную доставку событий
Stream processorKafka Streams или Apache FlinkДедупликация, фильтрация, обогащение событий
Ingestion consumerPython-приложение (или Faust)Парсинг документа, чанкинг, генерация эмбеддингов
Векторная БДQdrant (поддерживает streaming insertion)Хранит и индексирует эмбеддинги с payload

Дополнительно: мониторинг (Prometheus + Grafana), dead letter queue для ошибочных событий.


3. CDC из PostgreSQL через Debezium

Debezium — распределённый платформер для CDC; встраивается в Kafka Connect как Source Connector.

Принцип работы:

  1. Debezium коннектор подключается к WAL (Write-Ahead Log) PostgreSQL и читает изменения.
  2. Каждое изменение преобразуется в событие с ключом (первичный ключ таблицы) и значением (старое и новое состояние).
  3. События публикуются в топик Kafka вида server.schema.table.

Пример события Debezium (сокращённо):

{
  "payload": {
    "op": "u",  // "c" = create, "u" = update, "d" = delete
    "before": { "id": 42, "content": "Old document" },
    "after":  { "id": 42, "content": "Updated document" },
    "ts_ms": 1712345678000
  }
}

Важно: Debezium поддерживает exactly-once для PostgreSQL благодаря LSN (Log Sequence Number). Для сохранения порядка необходимо использовать ключ партиции — например, ID документа.


4. Kafka topic: партиционирование и ordering

Kafka гарантирует упорядоченность сообщений только внутри одной партиции при одинаковом ключе.

Для streaming RAG:

  • Ключ сообщения: document_id (ID документа).
  • Количество партиций: равно ожидаемому параллелизму ingestion consumer'ов (обычно 4–16).
  • Retention: настраивается так, чтобы события хранились не дольше времени обработки опоздавших данных (например, 24 часа).

Проблема: если один и тот же документ изменяется несколько раз подряд, все события должны попасть в одну партицию. Иначе ingestion consumer увидит их в неправильном порядке.


5. Потоковая обработка (stream processor)

Stream processor выполняет:

  • Дедупликацию — если одно и то же изменение пришло дважды (из-за ребаланса), пропускаем повтор.
  • Фильтрацию — отбрасываем события на нерелевантные таблицы или типы операций (например, "d" (delete) уходит другому consumer).
  • Трансформацию — добавляет метаданные (например, tenant ID).

Пример на Kafka Streams (Java-подобный псевдокод):

KStream<String, ChangeEvent> stream = builder.stream("documents.cdc");
stream
  .filter((key, value) -> value.op == "c" || value.op == "u")
  .transformValues(/* дедупликация по eventId */)
  .to("documents.validated");

Для Apache FlinkSQL на unbounded stream:

INSERT INTO documents_validated
SELECT *
FROM documents_cdc
WHERE op IN ('c', 'u')

Флинк даёт встроенную поддержку event-time и watermarks для обработки late-arriving data.


6. Ingestion consumer: от Kafka до векторной БД

Ingestion consumer — приложение, которое читает из очищенного топика (documents.validated) и выполняет:

  1. Парсинг документа — извлечение текста из форматов (PDF, HTML, Markdown).
  2. Chunking — разбиение на фрагменты фиксированного или семантического размера (например, по 512 токенов с перекрытием 128).
  3. Генерация эмбеддингов — вызов embedding model (например, text-embedding-3-small через API, или локально через ONNX).
  4. Запись в Qdrant — операция upsert с точкой, содержащей:
  5. Удаление старых чанков для обновлённых документов (запрос DELETE по document_id).

Код на Python (схематично):

consumer = KafkaConsumer("documents.validated", ...)
qdrant_client = QdrantClient(host="...", prefer_grpc=True)

for msg in consumer:
    event = json.loads(msg.value)
    if event["op"] == "d":
        qdrant_client.delete(points_selector=Filter(
            must=[FieldCondition(key="document_id", match=MatchValue(value=event["before"]["id"]))]
        ))
        continue
    doc = parse_document(event["after"]["content"])
    chunks = chunk_document(doc)
    embeddings = embed_model.encode(chunks)
    points = [PointStruct(id=hash(chunk), vector=emb.tolist(), payload={"doc_id": event["after"]["id"], "chunk": chunk}) for chunk, emb in zip(chunks, embeddings)]
    qdrant_client.upsert(points=points)

7. Векторная БД Qdrant: поддержка streaming insertion

Qdrant — одна из немногих векторных БД, спроектированных для высоконагруженных streaming-записей:

  • Grpc-протокол — низкая задержка, пакетная вставка.
  • Snapshot isolation — запись не блокирует чтение.
  • Payload-индексы — быстрые точечные удаления по document_id.

Альтернативы: Pinecone (управляемый, дорогой), Weaviate (есть streaming, но медленнее), Milvus (тяжёлый, требует настройки).

Qdrant поддерживает idempotent upsert — если точка с таким же id уже существует, она перезаписывается. Это упрощает exactly-once.


8. Сложности: ordering, late-arriving data, exactly-once

8.1 Ordering (упорядоченность)

Изменения одного документа должны приходить в правильном порядке. Решение:

  • Использовать партицию по document_id (как ключ сообщения Kafka).
  • На стороне consumer: обрабатывать события в порядке offset внутри партиции.

8.2 Late-arriving data (опоздавшие данные)

События могут прийти с задержкой (из-за сетевых проблем или ребаланса). Стратегии:

  • Ignore late events — если событие старше последнего обработанного, пропустить.
  • Watermark — во Flink: задать allowed lateness (например, 1 мин), события старше водяного знака отбрасываются.
  • Reindexing — при обнаружении старого события переиндексировать весь документ заново (costly).

8.3 Exactly-once (гарантия ровно одного сообщения)

Нужна, чтобы избежать дублирования чанков при сбоях.

Комбинация:

  • Kafka — транзакционный продюсер (производитель использует enable.idempotence=true и transactional.id).
  • Consumer — ручное управление offset: коммит после успешной записи в Qdrant (не автоматический).
  • Qdrant — идемпотентный upsert (если повторно записать тот же point с тем же id, данные не дублируются).

Для отказоустойчивости: ingestion consumer должен быть stateful с хранением обработанных eventId (например, в Redis) — это защита от повторной отправки после рестарта.


9. Latency budget: от изменения до индексации < 1 сек

Целевая задержка:

ЭтапТипичное времяОптимизация
CDC (DebeziumKafka)5–50 msРазмещение Debezium на той же ноде, что и БД
Kafka transfer1–10 msМинимальная задержка при acks=1
Stream processing10–50 msIn-memory дедупликация, без внешних хранилищ
Chunking + embedding100–400 msONNX-runtime (int8) для локальной модели, batch по 8–16 чанков
Qdrant upsert5–20 msПакетная вставка, использование gRPC, отключение синхронного репликации

Итого: < 1 с достижимо при правильной настройке. Мониторинг: метрики latency pipeline (Debezium lag, Kafka consumer lag, Qdrant write latency) через JMX или Prometheus.


10. Деплой и масштабирование

Типовой облачный деплой:

  • Kubernetes с Helm-чартами: Strimzi (Kafka Operator), Debezium (KafkaConnect), Flink (Flink Kubernetes Operator), Qdrant.
  • Каждый компонент масштабируется независимо:
    • Kafka: увеличение партиций (но осторожно — изменение партиций ломает ordering).
    • Ingestion consumer: горизонтальное масштабирование через consumer group (каждая партиция — одному consumer).
    • Qdrant: sharding (коллекция разбивается на n шардов, каждый на отдельной ноде).

Альтернатива Managed Services: Confluent Cloud (Kafka), Aiven for PostgreSQL + Debezium, Qdrant Cloud.


Пет-проект для закрепления

Задача: реализовать streaming RAG pipeline для небольшого корпуса новостей (PostgreSQL -> Kafka -> consumer -> Qdrant).

Инструменты:

Шаги:

  1. Поднять инфраструктуру через docker-compose.yml.
  2. Создать таблицу news(id, title, content, updated_at).
  3. Настроить Debezium коннектор на таблицу news, публикацию в топик news.cdc.
  4. Написать ingestion consumer, который:
    • Читает топик,
    • Извлекает content из after,
    • Дробит на чанки по 200 токенов,
    • Генерирует эмбеддинги (all-MiniLM-L6-v2),
    • Записывает в Qdrant коллекцию news_embeddings.
  5. Вставить/обновить строку в PostgreSQL и проверить, что чанки появились в Qdrant (через UI или API поиска).

Ожидаемый результат: при изменении документа latency < 2 секунды (включая время чанкинга). Pipeline выдерживает 10 изменений/сек.


Связь с другими вопросами

ВопросТема
850Определение streaming RAG и сравнение с batch
852Обработка late-arriving data в RAG
853Exactly-once семантика в потоковой обработке
854Выбор векторной БД для streaming ingestion
855Мониторинг real-time pipeline

Навигация