Как вы обрабатываете streaming данные для real-time RAG?

Краткий тезис

Обработка streaming данных для real-time RAG требует пайплайна с минимальной задержкой и возможностью инкрементального обновления векторного индекса. Основной подход — использование стриминговых фреймворков (Flink|Apache Flink, Spark Spark Streaming), оконной обработки, генерации эмбеддингов на лету (in-flight) и вставки в векторную БД с поддержкой операций upsert. Целевая latency менее 1 секунды от поступления события до появления документа в поиске.

1. Терминология: streaming данные и real-time RAG

Streaming данные — непрерывный поток записей, поступающих в реальном времени (логи, события IoT, курсы валют). В отличие от batch-обработки, данные обрабатываются по мере поступления.

Real-time RAG — Retrieval-Augmented Generation, в котором документы актуализируются практически сразу после появления (латентность от секунд до минут). Это критично в сценариях, где информация быстро устаревает (новости, торговые данные).

Ключевые требования: низкая задержка индексации]], идемпотентность, отказоустойчивость, возможность обрабатывать out-of-order события.

2. Архитектура пайплайна streaming RAG

Типовой пайплайн состоит из слоёв:

СлойКомпонентПример
ИсточникБрокер сообщенийApache Kafka, RabbitMQ, Amazon Kinesis
Stream processorОконная обработка, эмбеддингиApache Flink, Spark Structured Streaming, Faust (Python)
Векторный индексХранилище эмбеддинговMilvus, Qdrant, Weaviate, Pinecone
Инференс LLMГенерация ответаOpenAI API, vLLM, Llama.cpp

Поток:
Kafka topic "documents" -> Flink job (sliding window + embedding) -> Qdrant collection (upsert) -> FastAPI endpoint для вопросов

3. Оконная обработка (windowing)

Окно — способ группировки событий по времени или количеству. Для real-time RAG чаще всего используется sliding window (скользящее окно), позволяющее обновлять индекс каждые N секунд с перекрытием.

  • Tumbling window: фиксированный интервал без перекрытия (например, каждые 1 минуту). Проще, но может пропустить граничные события.
  • Sliding window: окно с шагом меньше длины (например, длина 2 минуты, шаг 30 секунд). Обеспечивает более частые обновления, но генерирует больше запросов к БД.
  • Session window: объединяет события внутри периодов активности. Полезно для пользовательских сессий.

Выбор окна зависит от требуемой freshness (свежести) данных. Если нужна latency < 1 сек, окно должно быть минимальным (sliding с шагом 100–500 мс) — но это увеличивает нагрузку.

4. Генерация эмбеддингов in-flight

Эмбеддинги вычисляются прямо в потоке без остановки. Подходы:

  • Single-event embedding: каждый документ эмбеддится отдельно (прост, но медлен при большом трафике).
  • Batch embedding: накопление пачки документов за малый интервал (например, 50 мс) и отправка одной батч-инференцией. Выгоднее по throughput, немного увеличивает latency.
  • Кэширование эмбеддингов: если документы часто повторяются (например, одинаковые заголовки), можно хранить кэш в Redis.

Пример на Faust (Python):

import faust
from sentence_transformers import SentenceTransformer

app = faust.App('embedder_app', broker='kafka://localhost:9092')
model = SentenceTransformer('all-MiniLM-L6-v2')

class Document(faust.Record):
    text: str
    timestamp: float

topic = app.topic('raw_docs', value_type=Document)

@app.agent(topic)
async def embed_docs(stream):
    batch = []
    async for doc in stream.take(10, within=0.1):  # batch 10 или 100 мс
        batch.append(doc.text)
    if batch:
        embeddings = model.encode(batch)
        for text, vec in zip(batch, embeddings):
            # upsert в векторную БД
            qdrant_client.upsert(collection, [PointStruct(id=..., vector=vec.tolist(), payload={"text": text})])

5. Векторные БД для real-time

Не все векторные БД одинаково подходят для streaming. Критичны:

  • Incremental indexing: возможность добавлять точки без полной перестройки индекса (HNSW позволяет, IVF неэффективен при частых добавлениях).
  • Upsert/delete: обновление существующих документов (например, если текст исправлен).
  • TTL (time-to-live): автоматическое удаление устаревших записей (например, новости старше 24 часов).
  • Filtering: поддержка фильтрации по временным меткам (например, event_time > now - 1 hour).
БДIncremental indexingUpsertTTLNotes
Qdrantдадада (через payload)Лёгкая настройка, подходит для small/medium
Milvusда (с HNSW)дадаЛучше для крупных кластеров
Weaviateдаданет (встроенный TTL отсутствует)Хорошая интеграция с Kafka
Pineconeдадада (managed)Проприетарная, платная

6. Обработка обновлений и удалений

Streaming данные могут приходить с задержкой или в неправильном порядке. Механизмы:

  • Upsert (update/insert): если документ имеет уникальный ID, повторная вставка с тем же ID перезаписывает вектор и payload.
  • Tombstone records: специальное событие (например, __deleted__: true) для удаления из индекса.
  • Watermark & event time: использование водяных знаков (watermarks) для обработки out-of-order событий. В Flink — assignTimestampsAndWatermarks с периодом maxOutOfOrderness.

Пример Flink SQL:

CREATE TABLE documents (
  id STRING,
  text STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', 'topic' = 'docs', 'format' = 'json');

7. Сравнение Batch vs Streaming RAG

ХарактеристикаBatch RAGStreaming RAG
Задержка индексацииМинуты–часыСекунды–минуты
Freshness данныхНизкаяВысокая
Вычислительная нагрузкаПиковая (регулярные batch-задачи)Постоянная, равномерная
Сложность инфраструктурыНизкая (cron + скрипты)Высокая (Flink, Kafka, отказоустойчивость)
ПрименимостьСтатические корпуса, блогиФинансовые данные, мониторинг, новости

8. Пример реализации: Kafka + Flink + Qdrant + LLM

Шаги:

  1. Установить Kafka, Qdrant, Flink cluster.
  2. Создать Flink job на Python (PyFlink) или Java, который читает из топика raw_docs.
  3. Применить оконную агрегацию (sliding window 2s, step 500ms) для группировки документов.
  4. Внутри окна вызвать модель эмбеддинга (через gRPC или local inference). Для PyFlink можно использовать MapFunction с моделью.
  5. Выполнить upsert в Qdrant через REST/gRPC с time_partition для фильтрации по событию.
  6. Для поиска — отдельный микросервис FastAPI, который при запросе пользователя делает поиск в Qdrant, получает топ-k документов, передаёт в LLM.

Псевдокод операции вставки

# PyFlink UDF
class EmbedAndInsert(MapFunction):
    def map(self, doc):
        vector = model.encode(doc.text)
        qdrant_client.upsert(
            collection_name="stream_collection",
            points=[PointStruct(id=doc.id, vector=vector, payload={
                "text": doc.text,
                "event_time": doc.event_time,
                "source": doc.source
            })],
            wait=False
        )
        return doc.id

9. Проблемы и решения

  • Дубликаты: могут возникнуть из-за повторной отправки (at-least-once semantics). Решение — идемпотентность по ID (upsert перезаписывает, а не создаёт новый).
  • Out-of-order события: событие с более ранним временем приходит после более позднего. Решение — водяные знаки и буферизация; в векторной БД можно использовать временные метки и фильтр на стороне поиска.
  • Высокая частота эмбеддингов: создаёт нагрузку на GPU/CPU. Решение — батчинг и асинхронная инференция (например, через Triton Inference Server).
  • Масштабирование: при росте топиков увеличивают количество партиций Kafka и параллелизм Flink. Векторную БД масштабируют шардированием.

10. Мониторинг пайплайна

Необходимые метрики (через Prometheus + Grafana):

Пороговые значения: при превышении 1 секунды задержки — алерт.

11. Связь с Agentic RAG

Агенты часто полагаются на самую свежую информацию (погода, новости, статусы систем). Streaming RAG — основа для action-oriented агентов: они могут подписываться на темы, обрабатывать события, запускать ретрив по последним данным. Например, агент по мониторингу инфраструктуры получает stream логов, индексирует ошибки и отвечает на вопросы «Какие сервера упали за последние 5 минут?». Без streaming пайплайна такая система была бы неактуальной.

12. Best practices

  • Используйте idempotent writes в векторную БД (уникальный ID для каждой записи).
  • Настройте TTL для автоматической очистки устаревших данных, чтобы не переполнять индекс.
  • Для уменьшения latency эмбеддингов размещайте модель на GPU и используйте ONNX или TensorRT.
  • Тестируйте пайплайн под нагрузкой с помощью chaos engineering (потеря сообщений, задержки Kafka).
  • Для high throughput используйте batch upsert (одним запросом вставлять 100–1000 векторов).

Пет-проект для закрепления

Задача: построить real-time RAG для новостного RSS-фида.

Инструменты: Apache Kafka (через Docker), Faust (Python), Sentence-Transformers, Qdrant, OpenAI API.

Шаги:

  1. Запустить Kafka, Qdrant локально (docker-compose).
  2. Написать Faust-приложение, которое читает RSS ленту и отправляет заголовки в Kafka.
  3. Второй Faust-агент (из примера выше) батчит эмбеддинги и upsert в Qdrant.
  4. Создать FastAPI эндпоинт /ask с LLM, который ищет по Qdrant последние 10 документов (фильтр по event_time > now - 1h).
  5. Имитировать поток: каждые 5 секунд новое событие.

Ожидаемый результат: API отвечает на вопросы о свежих новостях с задержкой менее 2 секунд.

Связь с другими вопросами

ВопросТема
1Архитектура RAG системы
5Оценка качества retrieval
7Уменьшение latency
9Обновление документов в RAG
10Self-RAG
15Инкрементальные эмбеддинги

Навигация