Как вы обновляете документы в существующей RAG-системе?

Краткий тезис

В production RAG-системах документы меняются: добавляются новые, обновляются существующие, удаляются устаревшие. Главная проблема — синхронизация между источником документов и векторной БД (индексом). Основные стратегии: full re-indexing (перестроить всё с нуля, дорого, но просто), incremental update (обновлять только изменившееся, дёшево, но сложно) и versioned подход (хранить историю версий). Ключевое правило: TTL для кэшей и event-driven обновления (при изменении документа → переиндексация).


1. Термин: Индекс (Index) в RAG

Что это Векторная БД (Qdrant, Pinecone, Weaviate) хранит эмбеддинги чанков и метаданные. По этому индексу происходит поиск.

Проблема Когда документ меняется, индекс должен обновиться, иначе retrieval найдёт старую версию документа.

Термин «Stale data» (устаревшие данные Индекс содержит старую версию документа, а актуальная версия уже другая.


2. Три подхода к обновлению документов

2.1 Full re-indexing (полная переиндексация)

Что это Удалить весь индекс и перестроить его заново со всеми документами.

Pipeline

1. Загрузить все актуальные документы из источника (S3, SharePoint, база данных)
2. Для каждого документа:
   - Распарсить
   - Разбить на чанки
   - Вычислить эмбеддинги
3. Удалить старый индекс в векторной БД
4. Вставить все новые чанки батчами
5. Переключить production на новый индекс
ПлюсыМинусы
Простота реализацииДорого (все эмбеддинги пересчитываются)
Гарантированная консистентностьДолго (часы или дни для миллионов документов)
Нет сложной логики обновленияДаунтайм во время переключения индексов

Термин «Консистентность» (Consistency Все данные в индексе согласованы с источником, нет рассинхрона.

Когда использовать

  • Документы обновляются редко (раз в неделю/месяц)
  • Количество документов небольшое (<100k)
  • Даунтайм допустим (ночное окно)

Цифра Full re-indexing 1 млн документов (768d) на A10G GPU занимает 3-5 часов.


2.2 Incremental update (инкрементальное обновление)

Что это Обновляются только те документы, которые изменились с последней синхронизации.

Pipeline

1. Отследить изменения в источнике документов
   - Файловая система: mtime (время модификации)
   - S3: etag + last_modified
   - База данных: updated_at timestamp
   - SharePoint: version number

2. Для каждого изменённого документа:
   - Удалить все его старые чанки из векторной БД (по source_id)
   - Заново распарсить, нарезать на чанки, вычислить эмбеддинги
   - Вставить новые чанки

3. Для удалённых документов: удалить все чанки по source_id

Термин «Хеш» (Hash Отпечаток содержимого файла. Если хеш не изменился, документ не менялся (даже если mtime изменился).

import hashlib

def get_document_hash(content: str) -> str:
    return hashlib.sha256(content.encode()).hexdigest()

# Храним в метаданных: {"source_id": "doc123", "content_hash": "a3f5c1..."}
# При синхронизации: сравниваем текущий хеш с сохранённым
ПлюсыМинусы
Эффективно (обновляется только changed)Сложнее реализовать
Нет даунтайма (можно на лету)Нужен механизм отслеживания изменений
Подходит для частых обновленийРиск пропустить изменения (баги в трекере)

Термин «CDC» (Change Data Capture Технология отслеживания изменений в базе данных (Debezium, Kafka Connect). Позволяет получать события "документ обновлён" в реальном времени.

Когда использовать

  • Документы обновляются часто (раз в час/день)
  • Большой объём данных (>1 млн)
  • Нулевой даунтайм обязателен

Цифра При 1% ежедневных изменений, incremental update в 100 раз дешевле full re-indexing.


2.3 Versioned подход (версионирование)

Что это Храним все версии документов в индексе, при retrieval учитываем актуальность.

Pipeline

Метаданные каждого чанка:
{
    "text": "...",
    "source_id": "doc123",
    "version": 3,
    "timestamp": "2025-01-01T12:00:00",
    "is_current": true  # последняя версия
}

# При обновлении документа:
# - Ставим is_current=false у всех старых чанков
# - Добавляем новые чанки с is_current=true

При retrieval

# Фильтр только по текущим версиям
results = client.search(
    collection_name="documents",
    query_vector=embedding,
    query_filter={
        "must": [{"key": "is_current", "match": {"value": True}}]
    }
)
ПлюсыМинусы
Можно откатиться к старой версииРастёт объём хранилища (все версии)
Аудит изменений (история)Усложнение фильтрации при поиске
Нет даунтаймаТребует управления устаревшими версиями

Термин «Аудит» (Audit Возможность посмотреть, как документ менялся со временем.

Когда использовать

  • Юридические, медицинские, финансовые документы (требуют аудита)
  • Необходим откат к предыдущей версии
  • Объём изменений не очень большой

Цифра При ежедневных обновлениях 10% документов, через месяц версионирование увеличит размер индекса в 2-3 раза.


3. Сравнение подходов

ХарактеристикаFull re-indexingIncremental updateVersioned
Сложность реализацииНизкаяВысокаяСредняя
Стоимость (компьютингВысокаяНизкаяСредняя
ДаунтаймЕсть (при переключении)НетНет
КонсистентностьГарантированаЕсли трекер надёженГарантирована
Аудит историиНетНетДа
Рост объёма данныхНетНетДа (версии)
Когда использоватьРедкие обновленияЧастые обновленияАудит, откаты

4. Проблема stale data (устаревшие данные)

Что это LLM получает старую информацию, потому что кэш или индекс не обновились после изменения документа.

Источники stale data в RAG

ИсточникПроблемаРешение
Кэш (RedisВ кэше сохранён ответ на основе старого документаTTL на кэш + инвалидация при обновлении документа
Векторная БДВ индексе ещё старая версия чанкаIncremental update или versioned подход
LLM context windowДокумент изменился, но LLM уже держит старый контекстОбновлять контекст при каждом запросе (не хранить долго)

Термин «Инвалидация кэша» (Cache invalidation Процесс удаления устаревших записей из кэша.

Решение для кэша

# При обновлении документа doc123:
def on_document_updated(source_id: str):
    # 1. Удаляем все записи кэша, связанные с этим документом
    redis.delete(f"rag_answer:*{source_id}*")
    
    # 2. Или используем versioned ключи
    current_version = get_doc_version(source_id)
    cache_key = f"rag_answer:{query_hash}:{source_id}:{current_version}"

Термин «TTL (Time To Live)» Время жизни записи в кэше. Например, TTL = 1 час → через час запись удалится, даже если её не инвалидировали вручную.


5. Event-driven обновления (паттерн событий)

Что это Архитектура, где при изменении документа генерируется событие, которое запускает переиндексацию.

Архитектура

[Источник документа] → [Kafka событие] → [Consumer (обработчик)] → [Векторная БД]

Пример:
1. Пользователь загружает новый документ в S3 → S3 Event → Kafka topic "document.uploaded"
2. Consumer слушает topic → парсит документ → обновляет индекс
3. При обновлении документа → событие "document.updated" → Consumer
4. При удалении документа → событие "document.deleted" → Consumer (удаляет чанки)

Код для обработки события

from kafka import KafkaConsumer

consumer = KafkaConsumer('document.changed', bootstrap_servers='localhost:9092')

for message in consumer:
    event = json.loads(message.value)
    source_id = event['source_id']
    action = event['action']  # 'upload', 'update', 'delete'
    
    if action in ('upload', 'update'):
        # Удаляем старые чанки (если были)
        delete_chunks_by_source(source_id)
        # Обрабатываем новый документ
        chunks = process_document(event['document_url'])
        # Вставляем новые чанки
        insert_chunks(chunks)
        
    elif action == 'delete':
        delete_chunks_by_source(source_id)

Термин «Event-driven architecture» Система реагирует на события (изменения), а не работает по расписанию (cron).

Когда использовать Реальное время (real-time), нулевая задержка между изменением документа и доступностью в поиске.

Цифра Event-driven + incremental update дают latency обновления 1-5 секунд (от загрузки документа до индексации).


6. Полный пайплайн обновления (рекомендация)

Гибридный подход для production

class DocumentUpdateManager:
    def __init__(self, vector_db, cache, kafka_consumer):
        self.db = vector_db
        self.cache = cache
        self.consumer = kafka_consumer
    
    def sync_all_documents(self):
        """Полная синхронизация (раз в день/неделю)"""
        all_docs = fetch_all_documents_from_source()
        
        # Создаём новый индекс в фоне
        new_index_id = create_temp_index()
        for doc in all_docs:
            chunks = process_document(doc)
            insert_into_index(new_index_id, chunks)
        
        # Переключаем production
        swap_indexes(new_index_id)
        # Инвалидируем весь кэш
        self.cache.flush_all()
    
    def incremental_update(self, source_id):
        """Инкрементальное обновление по событию"""
        # 1. Удаляем старые чанки
        self.db.delete(filter={"source_id": source_id})
        
        # 2. Инвалидируем кэш для этого документа
        self.cache.delete_by_pattern(f"*{source_id}*")
        
        # 3. Индексируем актуальную версию
        doc = fetch_document(source_id)
        chunks = process_document(doc)
        self.db.insert(chunks)
    
    def run_listener(self):
        for event in self.consumer:
            self.incremental_update(event.source_id)
    
    def schedule_full_sync(self):
        """Запуск полной синхронизации раз в сутки (для надёжности)"""
        schedule.every().day.at("03:00").do(self.sync_all_documents)

7. Пет-проект для закрепления

Задача Реализовать систему обновления документов в RAG с incremental update.

Инструменты Python, Qdrant, SQLite (для отслеживания версий), Kafka (опционально)

Шаги

  1. Развернуть Qdrant и создать коллекцию с метаданными (source_id, version, content_hash)
  2. Загрузить 100 документов, сохранить их хеши в SQLite
  3. Реализовать функцию sync_changes():
    • Пройти по всем документам, вычислить текущие хеши
    • Сравнить с сохранёнными в SQLite
    • Для изменённых документов: удалить старые чанки, добавить новые
  4. Протестировать:
    • Изменить 10 документов (добавить текст, удалить текст)
    • Добавить 5 новых документов
    • Удалить 3 документа
    • Запустить синхронизацию, проверить, что индекс обновился
  5. Замерить время инкрементального обновления против full re-indexing
  6. Добавить TTL для кэша (Redis)

Ожидаемый результат Система, которая обновляет только изменившиеся документы, время синхронизации пропорционально количеству изменений, а не общему объёму.


Связь с другими вопросами

ВопросТема
1RAG архитектура (индексация документов)
4Векторная БД (метаданные для source_id)
7Кэширование и TTL
13Эффективная загрузка 1000+ документов (батчинг)
69CI/CD для RAG-пайплайна (версионирование индексов)
79Обновление embedding модели без переиндексации (dual index)
231Инкрементальные вставки в ANN индекс

9. Как вы обновляете документы в существующей RAG-системе|9. Как вы обновляете документы в существующей RAG-системе|9. Как вы обновляете документы в существующей RAG-системе|9 полностью разобран. Переходим к вопросу 10, когда будете готовы|Вопрос 9 полностью разобран. Переходим к вопросу 10, когда будете готовы]]


Навигация