English translation is not available yet. Showing Russian content.
Как строить streaming RAG pipeline (real-time ingestion)?
Краткий тезис
Streaming RAG pipeline (конвейер потоковой RAG) — это архитектура для непрерывной индексации документов в реальном времени без даунтайма. Основа — Change Data Capture (CDC) из источника (например, PostgreSQL), передача изменений через Apache Kafka, потоковая обработка для дедупликации и трансформации, и финальная запись в векторную базу данных (Qdrant). Ключевые вызовы: порядок событий (ordering), обработка опоздавших данных (late-arriving data) и гарантия exactly-once. Целевая задержка от изменения документа до индексации — менее 1 секунды.
1. Терминология: Streaming RAG pipeline и real-time ingestion
Streaming RAG pipeline — конвейер, который обрабатывает изменения в документах по мере их появления, без batch-загрузок. Real-time ingestion — процесс непрерывного приёма новых и обновлённых документов, их разбиения (chunking), генерации эмбеддингов и помещения в векторное хранилище.
Отличие от классической RAG: в batch-режиме индексация запускается по расписанию (раз в сутки), а streaming обеспечивает актуальность данных с задержкой в секунды.
Термин Change Data Capture (CDC) — техника захвата изменений в базе данных (INSERT, UPDATE, DELETE) на уровне транзакционного лога, чтобы транслировать их в стрим событий.
2. Компоненты streaming RAG pipeline
Архитектура состоит из 5 ключевых компонентов:
| Компонент | Пример технологии | Функция |
|---|---|---|
| Источник CDC | Debezium (Kafka Connect) | Читает WAL PostgreSQL и публикует события в Kafka |
| Message broker | Apache Kafka | Гарантирует упорядоченность и надёжную доставку событий |
| Stream processor | Kafka Streams или Apache Flink | Дедупликация, фильтрация, обогащение событий |
| Ingestion consumer | Python-приложение (или Faust) | Парсинг документа, чанкинг, генерация эмбеддингов |
| Векторная БД | Qdrant (поддерживает streaming insertion) | Хранит и индексирует эмбеддинги с payload |
Дополнительно: мониторинг (Prometheus + Grafana), dead letter queue для ошибочных событий.
3. CDC из PostgreSQL через Debezium
Debezium — распределённый платформер для CDC; встраивается в Kafka Connect как Source Connector.
Принцип работы:
- Debezium коннектор подключается к WAL (Write-Ahead Log) PostgreSQL и читает изменения.
- Каждое изменение преобразуется в событие с ключом (первичный ключ таблицы) и значением (старое и новое состояние).
- События публикуются в топик Kafka вида server.schema.table.
Пример события Debezium (сокращённо):
{
"payload": {
"op": "u", // "c" = create, "u" = update, "d" = delete
"before": { "id": 42, "content": "Old document" },
"after": { "id": 42, "content": "Updated document" },
"ts_ms": 1712345678000
}
}
Важно: Debezium поддерживает exactly-once для PostgreSQL благодаря LSN (Log Sequence Number). Для сохранения порядка необходимо использовать ключ партиции — например, ID документа.
4. Kafka topic: партиционирование и ordering
Kafka гарантирует упорядоченность сообщений только внутри одной партиции при одинаковом ключе.
Для streaming RAG:
- Ключ сообщения: document_id (ID документа).
- Количество партиций: равно ожидаемому параллелизму ingestion consumer'ов (обычно 4–16).
- Retention: настраивается так, чтобы события хранились не дольше времени обработки опоздавших данных (например, 24 часа).
Проблема: если один и тот же документ изменяется несколько раз подряд, все события должны попасть в одну партицию. Иначе ingestion consumer увидит их в неправильном порядке.
5. Потоковая обработка (stream processor)
Stream processor выполняет:
- Дедупликацию — если одно и то же изменение пришло дважды (из-за ребаланса), пропускаем повтор.
- Фильтрацию — отбрасываем события на нерелевантные таблицы или типы операций (например,
"d"(delete) уходит другому consumer). - Трансформацию — добавляет метаданные (например, tenant ID).
Пример на Kafka Streams (Java-подобный псевдокод):
KStream<String, ChangeEvent> stream = builder.stream("documents.cdc");
stream
.filter((key, value) -> value.op == "c" || value.op == "u")
.transformValues(/* дедупликация по eventId */)
.to("documents.validated");
Для Apache Flink — SQL на unbounded stream:
INSERT INTO documents_validated
SELECT *
FROM documents_cdc
WHERE op IN ('c', 'u')
Флинк даёт встроенную поддержку event-time и watermarks для обработки late-arriving data.
6. Ingestion consumer: от Kafka до векторной БД
Ingestion consumer — приложение, которое читает из очищенного топика (documents.validated) и выполняет:
- Парсинг документа — извлечение текста из форматов (PDF, HTML, Markdown).
- Chunking — разбиение на фрагменты фиксированного или семантического размера (например, по 512 токенов с перекрытием 128).
- Генерация эмбеддингов — вызов embedding model (например,
text-embedding-3-smallчерез API, или локально через ONNX). - Запись в Qdrant — операция upsert с точкой, содержащей:
- id = chunk hash
- vector = эмбеддинг
- payload = { document_id, position, raw_text }
- Удаление старых чанков для обновлённых документов (запрос DELETE по
document_id).
Код на Python (схематично):
consumer = KafkaConsumer("documents.validated", ...)
qdrant_client = QdrantClient(host="...", prefer_grpc=True)
for msg in consumer:
event = json.loads(msg.value)
if event["op"] == "d":
qdrant_client.delete(points_selector=Filter(
must=[FieldCondition(key="document_id", match=MatchValue(value=event["before"]["id"]))]
))
continue
doc = parse_document(event["after"]["content"])
chunks = chunk_document(doc)
embeddings = embed_model.encode(chunks)
points = [PointStruct(id=hash(chunk), vector=emb.tolist(), payload={"doc_id": event["after"]["id"], "chunk": chunk}) for chunk, emb in zip(chunks, embeddings)]
qdrant_client.upsert(points=points)
7. Векторная БД Qdrant: поддержка streaming insertion
Qdrant — одна из немногих векторных БД, спроектированных для высоконагруженных streaming-записей:
- Grpc-протокол — низкая задержка, пакетная вставка.
- Snapshot isolation — запись не блокирует чтение.
- Payload-индексы — быстрые точечные удаления по
document_id.
Альтернативы: Pinecone (управляемый, дорогой), Weaviate (есть streaming, но медленнее), Milvus (тяжёлый, требует настройки).
Qdrant поддерживает idempotent upsert — если точка с таким же id уже существует, она перезаписывается. Это упрощает exactly-once.
8. Сложности: ordering, late-arriving data, exactly-once
8.1 Ordering (упорядоченность)
Изменения одного документа должны приходить в правильном порядке. Решение:
- Использовать партицию по
document_id(как ключ сообщения Kafka). - На стороне consumer: обрабатывать события в порядке offset внутри партиции.
8.2 Late-arriving data (опоздавшие данные)
События могут прийти с задержкой (из-за сетевых проблем или ребаланса). Стратегии:
- Ignore late events — если событие старше последнего обработанного, пропустить.
- Watermark — во Flink: задать allowed lateness (например, 1 мин), события старше водяного знака отбрасываются.
- Reindexing — при обнаружении старого события переиндексировать весь документ заново (costly).
8.3 Exactly-once (гарантия ровно одного сообщения)
Нужна, чтобы избежать дублирования чанков при сбоях.
Комбинация:
- Kafka — транзакционный продюсер (производитель использует
enable.idempotence=trueиtransactional.id). - Consumer — ручное управление offset: коммит после успешной записи в Qdrant (не автоматический).
- Qdrant — идемпотентный upsert (если повторно записать тот же point с тем же id, данные не дублируются).
Для отказоустойчивости: ingestion consumer должен быть stateful с хранением обработанных eventId (например, в Redis) — это защита от повторной отправки после рестарта.
9. Latency budget: от изменения до индексации < 1 сек
Целевая задержка:
| Этап | Типичное время | Оптимизация |
|---|---|---|
| CDC (Debezium → Kafka) | 5–50 ms | Размещение Debezium на той же ноде, что и БД |
| Kafka transfer | 1–10 ms | Минимальная задержка при acks=1 |
| Stream processing | 10–50 ms | In-memory дедупликация, без внешних хранилищ |
| Chunking + embedding | 100–400 ms | ONNX-runtime (int8) для локальной модели, batch по 8–16 чанков |
| Qdrant upsert | 5–20 ms | Пакетная вставка, использование gRPC, отключение синхронного репликации |
Итого: < 1 с достижимо при правильной настройке. Мониторинг: метрики latency pipeline (Debezium lag, Kafka consumer lag, Qdrant write latency) через JMX или Prometheus.
10. Деплой и масштабирование
Типовой облачный деплой:
- Kubernetes с Helm-чартами: Strimzi (Kafka Operator), Debezium (KafkaConnect), Flink (Flink Kubernetes Operator), Qdrant.
- Каждый компонент масштабируется независимо:
Альтернатива Managed Services: Confluent Cloud (Kafka), Aiven for PostgreSQL + Debezium, Qdrant Cloud.
Пет-проект для закрепления
Задача: реализовать streaming RAG pipeline для небольшого корпуса новостей (PostgreSQL -> Kafka -> consumer -> Qdrant).
Инструменты:
- Docker Compose: postgres (с включённым WAL-логированием), debezium/kafka-connect, kafka (confluent image), python-образ.
- Язык: Python (confluent_kafka, qdrant-client, sentence-transformers).
Шаги:
- Поднять инфраструктуру через
docker-compose.yml. - Создать таблицу
news(id, title, content, updated_at). - Настроить Debezium коннектор на таблицу
news, публикацию в топикnews.cdc. - Написать ingestion consumer, который:
- Читает топик,
- Извлекает
contentизafter, - Дробит на чанки по 200 токенов,
- Генерирует эмбеддинги (all-MiniLM-L6-v2),
- Записывает в Qdrant коллекцию
news_embeddings.
- Вставить/обновить строку в PostgreSQL и проверить, что чанки появились в Qdrant (через UI или API поиска).
Ожидаемый результат: при изменении документа latency < 2 секунды (включая время чанкинга). Pipeline выдерживает 10 изменений/сек.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 850 | Определение streaming RAG и сравнение с batch |
| 852 | Обработка late-arriving data в RAG |
| 853 | Exactly-once семантика в потоковой обработке |
| 854 | Выбор векторной БД для streaming ingestion |
| 855 | Мониторинг real-time pipeline |
Навигация
- Предыдущий: 850
- Следующий: 852
- Индекс: 00. Индекс разборов