中文翻译暂不可用,显示俄语原文。

Как вы делаете incremental ingestion для часто меняющихся документов?

Краткий тезис

ingestion|Incremental ingestion (ingestion|инкрементальная загрузка) для часто меняющихся документов — это процесс обновления векторной базы данных только изменёнными частями, без переиндексации всего корпуса. Ключевой паттерн — Change Data Capture (CDC) из источника (например, базы данных или файлового хранилища), передача изменений через Apache Kafka (или аналогичный message broker) и consumer, который атомарно удаляет старые чанки по source_id и вставляет новые. Это критически важно для поддержания актуальности RAG-системы в реальном времени.

1. Термин: Incremental Ingestion (инкрементальная загрузка)

Что это Стратегия обновления данных, при которой в векторную БД добавляются, изменяются или удаляются только те документы (или их чанки), которые изменились с момента последней синхронизации.

Противопоставление

  • Full re-index (полная переиндексация): Каждый раз пересчитываем эмбеддинги для всех документов и перезаписываем всю векторную БД. Это дорого и медленно при больших объёмах.
  • Incremental ingestion Обрабатываем только дельту (изменения). Быстро, эффективно, позволяет работать в реальном времени.

Термин «Дельта» (Delta) — набор изменений (вставки, обновления, удаления), произошедших за определённый промежуток времени.

2. Зачем нужен incremental ingestion для часто меняющихся документов?

Проблема В RAG-системе ответ LLM основан на контексте из векторной БД. Если документы устарели (например, изменились цены, условия договора, новости), ответы будут неактуальными или неверными.

Решение Инкрементальная загрузка позволяет:

  • Поддерживать актуальность Система всегда видит последнюю версию документа.
  • Снижать latency (задержку) обновления Изменения отражаются за секунды/минуты, а не за часы.
  • Экономить ресурсы Не нужно пересчитывать эмбеддинги для миллионов неизменённых документов.
  • Избегать даунтайма Обновление происходит без остановки сервиса.

3. Основной паттерн: CDC → KafkaConsumer → Vector DB

Это классическая архитектура для инкрементальной загрузки в реальном времени.

Компоненты

КомпонентРольПримеры
Источник данныхХранилище, где меняются документы (SQL БД, NoSQL, файловая система, S3)PostgreSQL, MongoDB, Google Drive
CDC (Change Data Capture)Механизм, который отслеживает изменения в источнике (insert, update, delete) и публикует их как событияDebezium (для PostgreSQL, MySQL), AWS DMS, Kafka Connect
Message Broker (брокер сообщений)Промежуточное звено, которое буферизирует и распределяет события измененийApache Kafka, Amazon Kinesis, RabbitMQ
Consumer (потребитель)Сервис, который читает события из брокера и выполняет действия с векторной БДPython-скрипт, Spark Streaming, Lambda-функция
Vector DB (векторная база данных)Хранилище эмбеддингов и метаданных документовPinecone, Weaviate, Qdrant, Milvus

Поток данных

  1. Источник Пользователь обновляет документ в PostgreSQL.
  2. CDC Debezium ловит UPDATE и формирует событие с новыми данными и source_id документа.
  3. Kafka Событие попадает в топик document_changes.
  4. Consumer Python-сервис читает событие.
  5. Consumer Извлекает текст документа, чанкует его, вычисляет эмбеддинги.
  6. Consumer Выполняет атомарную операцию в векторной БД:
    • Удаление delete_by_filter(source_id=<id_документа>) — удаляет все старые чанки этого документа.
    • Вставка insert(new_chunks) — вставляет новые чанки с эмбеддингами.
  7. Готово Теперь retrieval видит только актуальные чанки.

4. Детали реализации: обработка разных типов изменений

Consumer должен обрабатывать три типа событий CDC:

Тип событияДействие в векторной БДПример кода (псевдокод)
INSERT (новый документ)Просто вставить новые чанкиvector_db.insert(chunks)
UPDATE (изменение документа)Удалить старые чанки по source_id, затем вставить новыеvector_db.delete(filter={"source_id": doc_id}); vector_db.insert(new_chunks)
DELETE (удаление документа)Удалить все чанки по source_idvector_db.delete(filter={"source_id": doc_id})

Важно Операция UPDATE должна быть атомарной (или идемпотентной), чтобы избежать состояния гонки, когда retrieval видит частично обновлённый документ (часть старых чанков, часть новых). В некоторых векторных БД есть поддержка транзакций, в других — нужно реализовать логику через версионирование.

5. Термин: Source ID (идентификатор источника)

Что это Уникальный идентификатор документа в исходной системе. Он должен быть стабильным (не меняться при обновлении документа).

Зачем нужен

  • Для связи чанков с исходным документом.
  • Для массового удаления всех чанков одного документа при обновлении или удалении.
  • Для дедупликации (чтобы не вставить один и тот же документ дважды).

Пример метаданных чанка

{
  "source_id": "doc_12345",
  "chunk_index": 0,
  "text": "Текст первого чанка...",
  "embedding": [0.1, 0.2, ...]
}

6. Альтернативные подходы (без CDC)

Если CDC невозможен (например, источник — статический файл на диске), можно использовать:

  • Polling (опрос) по времени Раз в N минут проверять last_modified документа. Если изменился — переиндексировать.
  • Webhook'и Источник сам уведомляет систему об изменениях (например, через HTTP callback).
  • Event-driven архитектура Приложение, которое меняет документ, само публикует событие в Kafka.

Сравнение

ПодходПлюсыМинусы
CDC (Debezium)Реальное время, не требует изменений в источникеСложность настройки, нагрузка на источник
PollingПростота реализацииЗадержка (до N минут), лишние запросы к источнику
WebhookБыстро, легкоТребует модификации источника

7. Проблемы и их решения

ПроблемаОписаниеРешение
Дублирование чанковИз-за сбоя consumer может вставить чанки повторноИспользовать идемпотентность: проверять source_id + chunk_index перед вставкой
Состояние гонкиДва события UPDATE приходят почти одновременноИспользовать версионирование (например, version в метаданных) или блокировки
Большой документОдин UPDATE может породить много событийЧанковать документ в consumer, а не в CDC
Удаление из источникаCDC может не поймать DELETE, если настроен неправильноНастроить Debezium на захват DELETE-событий
СогласованностьВекторная БД может временно содержать неконсистентные данныеИспользовать транзакции (если поддерживаются) или двухфазный коммит

8. Пример реализации на Python (схематично)

import json
from kafka import KafkaConsumer
from vector_db_client import VectorDBClient
from chunker import chunk_document
from embedder import compute_embeddings

consumer = KafkaConsumer(
    'document_changes',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

vector_db = VectorDBClient()

for message in consumer:
    event = message.value
    source_id = event['source_id']
    operation = event['op']  # 'c' (create), 'u' (update), 'd' (delete)
    document_text = event.get('after', {}).get('text', '')

    if operation == 'd':  # DELETE
        vector_db.delete(filter={"source_id": source_id})
        print(f"Deleted chunks for document {source_id}")
        continue

    # Для INSERT и UPDATE
    # 1. Удаляем старые чанки (для UPDATE, для INSERT это no-op)
    vector_db.delete(filter={"source_id": source_id})

    # 2. Чанкуем и эмбеддим
    chunks = chunk_document(document_text)
    embeddings = compute_embeddings(chunks)

    # 3. Вставляем новые чанки
    records = []
    for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
        records.append({
            "source_id": source_id,
            "chunk_index": i,
            "text": chunk,
            "embedding": embedding
        })
    vector_db.insert(records)
    print(f"Inserted {len(records)} chunks for document {source_id}")

Пет-проект для закрепления

Задача Реализовать инкрементальную загрузку для коллекции статей на Wikipedia (или новостей), которые обновляются раз в день.

Инструменты

Шаги:

  1. Настройка PostgreSQL Создать таблицу articles и включить логическую репликацию (wal_level = logical).
  2. Запуск Debezium Настроить коннектор для PostgreSQL, который будет отправлять изменения в топик articles_changes.
  3. Запуск Kafka Поднять брокер и создать топик.
  4. Написание consumer
    • Читать события из топика.
    • Для каждого события (INSERT/UPDATE/DELETE) выполнять соответствующее действие в ChromaDB.
    • Использовать source_id (id статьи) для удаления старых чанков.
  5. Тестирование
    • Вставить новую статью в PostgreSQL → проверить, что она появилась в векторной БД.
    • Обновить текст статьи → проверить, что старые чанки удалены, новые вставлены.
    • Удалить статью → проверить, что чанки удалены.

Ожидаемый результат Работающая система, где изменения в PostgreSQL автоматически и с минимальной задержкой отражаются в векторной БД, без необходимости полной переиндексации.

Связь с другими вопросами

ВопросТема
9Как вы обновляете документы в существующей RAG-системе? (общий подход)
10Что такое Self-RAG и когда его использовать? (альтернатива для динамических данных)
11Как вы обрабатываете запросы, на которые нет ответа в документах? (связано с актуальностью данных)
12Как вы оцениваете качество retrieval'а в RAG-системе? (метрики для проверки актуальности)
13Что такое RAPTOR и как он улучшает retrieval? (иерархическая индексация, может сочетаться с incremental)

Навигация