中文翻译暂不可用,显示俄语原文。
Как вы обновляете документы в существующей RAG-системе?
Краткий тезис
В production RAG-системах документы меняются: добавляются новые, обновляются существующие, удаляются устаревшие. Главная проблема — синхронизация между источником документов и векторной БД (индексом). Основные стратегии: full re-indexing (перестроить всё с нуля, дорого, но просто), incremental update (обновлять только изменившееся, дёшево, но сложно) и versioned подход (хранить историю версий). Ключевое правило: TTL для кэшей и event-driven обновления (при изменении документа → переиндексация).
1. Термин: Индекс (Index) в RAG
Что это Векторная БД (Qdrant, Pinecone, Weaviate) хранит эмбеддинги чанков и метаданные. По этому индексу происходит поиск.
Проблема Когда документ меняется, индекс должен обновиться, иначе retrieval найдёт старую версию документа.
Термин «Stale data» (устаревшие данные Индекс содержит старую версию документа, а актуальная версия уже другая.
2. Три подхода к обновлению документов
2.1 Full re-indexing (полная переиндексация)
Что это Удалить весь индекс и перестроить его заново со всеми документами.
Pipeline
1. Загрузить все актуальные документы из источника (S3, SharePoint, база данных)
2. Для каждого документа:
- Распарсить
- Разбить на чанки
- Вычислить эмбеддинги
3. Удалить старый индекс в векторной БД
4. Вставить все новые чанки батчами
5. Переключить production на новый индекс
| Плюсы | Минусы |
|---|---|
| Простота реализации | Дорого (все эмбеддинги пересчитываются) |
| Гарантированная консистентность | Долго (часы или дни для миллионов документов) |
| Нет сложной логики обновления | Даунтайм во время переключения индексов |
Термин «Консистентность» (Consistency Все данные в индексе согласованы с источником, нет рассинхрона.
Когда использовать
- Документы обновляются редко (раз в неделю/месяц)
- Количество документов небольшое (<100k)
- Даунтайм допустим (ночное окно)
Цифра Full re-indexing 1 млн документов (768d) на A10G GPU занимает 3-5 часов.
2.2 Incremental update (инкрементальное обновление)
Что это Обновляются только те документы, которые изменились с последней синхронизации.
Pipeline
1. Отследить изменения в источнике документов
- Файловая система: mtime (время модификации)
- S3: etag + last_modified
- База данных: updated_at timestamp
- SharePoint: version number
2. Для каждого изменённого документа:
- Удалить все его старые чанки из векторной БД (по source_id)
- Заново распарсить, нарезать на чанки, вычислить эмбеддинги
- Вставить новые чанки
3. Для удалённых документов: удалить все чанки по source_id
Термин «Хеш» (Hash Отпечаток содержимого файла. Если хеш не изменился, документ не менялся (даже если mtime изменился).
import hashlib
def get_document_hash(content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
# Храним в метаданных: {"source_id": "doc123", "content_hash": "a3f5c1..."}
# При синхронизации: сравниваем текущий хеш с сохранённым
| Плюсы | Минусы |
|---|---|
| Эффективно (обновляется только changed) | Сложнее реализовать |
| Нет даунтайма (можно на лету) | Нужен механизм отслеживания изменений |
| Подходит для частых обновлений | Риск пропустить изменения (баги в трекере) |
Термин «CDC» (Change Data Capture Технология отслеживания изменений в базе данных (Debezium, Kafka Connect). Позволяет получать события "документ обновлён" в реальном времени.
Когда использовать
- Документы обновляются часто (раз в час/день)
- Большой объём данных (>1 млн)
- Нулевой даунтайм обязателен
Цифра При 1% ежедневных изменений, incremental update в 100 раз дешевле full re-indexing.
2.3 Versioned подход (версионирование)
Что это Храним все версии документов в индексе, при retrieval учитываем актуальность.
Pipeline
Метаданные каждого чанка:
{
"text": "...",
"source_id": "doc123",
"version": 3,
"timestamp": "2025-01-01T12:00:00",
"is_current": true # последняя версия
}
# При обновлении документа:
# - Ставим is_current=false у всех старых чанков
# - Добавляем новые чанки с is_current=true
При retrieval
# Фильтр только по текущим версиям
results = client.search(
collection_name="documents",
query_vector=embedding,
query_filter={
"must": [{"key": "is_current", "match": {"value": True}}]
}
)
| Плюсы | Минусы |
|---|---|
| Можно откатиться к старой версии | Растёт объём хранилища (все версии) |
| Аудит изменений (история) | Усложнение фильтрации при поиске |
| Нет даунтайма | Требует управления устаревшими версиями |
Термин «Аудит» (Audit Возможность посмотреть, как документ менялся со временем.
Когда использовать
- Юридические, медицинские, финансовые документы (требуют аудита)
- Необходим откат к предыдущей версии
- Объём изменений не очень большой
Цифра При ежедневных обновлениях 10% документов, через месяц версионирование увеличит размер индекса в 2-3 раза.
3. Сравнение подходов
| Характеристика | Full re-indexing | Incremental update | Versioned |
|---|---|---|---|
| Сложность реализации | Низкая | Высокая | Средняя |
| Стоимость (компьютинг | Высокая | Низкая | Средняя |
| Даунтайм | Есть (при переключении) | Нет | Нет |
| Консистентность | Гарантирована | Если трекер надёжен | Гарантирована |
| Аудит истории | Нет | Нет | Да |
| Рост объёма данных | Нет | Нет | Да (версии) |
| Когда использовать | Редкие обновления | Частые обновления | Аудит, откаты |
4. Проблема stale data (устаревшие данные)
Что это LLM получает старую информацию, потому что кэш или индекс не обновились после изменения документа.
Источники stale data в RAG
| Источник | Проблема | Решение |
|---|---|---|
| Кэш (Redis | В кэше сохранён ответ на основе старого документа | TTL на кэш + инвалидация при обновлении документа |
| Векторная БД | В индексе ещё старая версия чанка | Incremental update или versioned подход |
| LLM context window | Документ изменился, но LLM уже держит старый контекст | Обновлять контекст при каждом запросе (не хранить долго) |
Термин «Инвалидация кэша» (Cache invalidation Процесс удаления устаревших записей из кэша.
Решение для кэша
# При обновлении документа doc123:
def on_document_updated(source_id: str):
# 1. Удаляем все записи кэша, связанные с этим документом
redis.delete(f"rag_answer:*{source_id}*")
# 2. Или используем versioned ключи
current_version = get_doc_version(source_id)
cache_key = f"rag_answer:{query_hash}:{source_id}:{current_version}"
Термин «TTL (Time To Live)» Время жизни записи в кэше. Например, TTL = 1 час → через час запись удалится, даже если её не инвалидировали вручную.
5. Event-driven обновления (паттерн событий)
Что это Архитектура, где при изменении документа генерируется событие, которое запускает переиндексацию.
Архитектура
[Источник документа] → [Kafka событие] → [Consumer (обработчик)] → [Векторная БД]
Пример:
1. Пользователь загружает новый документ в S3 → S3 Event → Kafka topic "document.uploaded"
2. Consumer слушает topic → парсит документ → обновляет индекс
3. При обновлении документа → событие "document.updated" → Consumer
4. При удалении документа → событие "document.deleted" → Consumer (удаляет чанки)
Код для обработки события
from kafka import KafkaConsumer
consumer = KafkaConsumer('document.changed', bootstrap_servers='localhost:9092')
for message in consumer:
event = json.loads(message.value)
source_id = event['source_id']
action = event['action'] # 'upload', 'update', 'delete'
if action in ('upload', 'update'):
# Удаляем старые чанки (если были)
delete_chunks_by_source(source_id)
# Обрабатываем новый документ
chunks = process_document(event['document_url'])
# Вставляем новые чанки
insert_chunks(chunks)
elif action == 'delete':
delete_chunks_by_source(source_id)
Термин «Event-driven architecture» Система реагирует на события (изменения), а не работает по расписанию (cron).
Когда использовать Реальное время (real-time), нулевая задержка между изменением документа и доступностью в поиске.
Цифра Event-driven + incremental update дают latency обновления 1-5 секунд (от загрузки документа до индексации).
6. Полный пайплайн обновления (рекомендация)
Гибридный подход для production
class DocumentUpdateManager:
def __init__(self, vector_db, cache, kafka_consumer):
self.db = vector_db
self.cache = cache
self.consumer = kafka_consumer
def sync_all_documents(self):
"""Полная синхронизация (раз в день/неделю)"""
all_docs = fetch_all_documents_from_source()
# Создаём новый индекс в фоне
new_index_id = create_temp_index()
for doc in all_docs:
chunks = process_document(doc)
insert_into_index(new_index_id, chunks)
# Переключаем production
swap_indexes(new_index_id)
# Инвалидируем весь кэш
self.cache.flush_all()
def incremental_update(self, source_id):
"""Инкрементальное обновление по событию"""
# 1. Удаляем старые чанки
self.db.delete(filter={"source_id": source_id})
# 2. Инвалидируем кэш для этого документа
self.cache.delete_by_pattern(f"*{source_id}*")
# 3. Индексируем актуальную версию
doc = fetch_document(source_id)
chunks = process_document(doc)
self.db.insert(chunks)
def run_listener(self):
for event in self.consumer:
self.incremental_update(event.source_id)
def schedule_full_sync(self):
"""Запуск полной синхронизации раз в сутки (для надёжности)"""
schedule.every().day.at("03:00").do(self.sync_all_documents)
7. Пет-проект для закрепления
Задача Реализовать систему обновления документов в RAG с incremental update.
Инструменты Python, Qdrant, SQLite (для отслеживания версий), Kafka (опционально)
Шаги
- Развернуть Qdrant и создать коллекцию с метаданными (source_id, version, content_hash)
- Загрузить 100 документов, сохранить их хеши в SQLite
- Реализовать функцию
sync_changes():- Пройти по всем документам, вычислить текущие хеши
- Сравнить с сохранёнными в SQLite
- Для изменённых документов: удалить старые чанки, добавить новые
- Протестировать:
- Изменить 10 документов (добавить текст, удалить текст)
- Добавить 5 новых документов
- Удалить 3 документа
- Запустить синхронизацию, проверить, что индекс обновился
- Замерить время инкрементального обновления против full re-indexing
- Добавить TTL для кэша (Redis)
Ожидаемый результат Система, которая обновляет только изменившиеся документы, время синхронизации пропорционально количеству изменений, а не общему объёму.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | RAG архитектура (индексация документов) |
| 4 | Векторная БД (метаданные для source_id) |
| 7 | Кэширование и TTL |
| 13 | Эффективная загрузка 1000+ документов (батчинг) |
| 69 | CI/CD для RAG-пайплайна (версионирование индексов) |
| 79 | Обновление embedding модели без переиндексации (dual index) |
| 231 | Инкрементальные вставки в ANN индекс |
9. Как вы обновляете документы в существующей RAG-системе|9. Как вы обновляете документы в существующей RAG-системе|9. Как вы обновляете документы в существующей RAG-системе|9 полностью разобран. Переходим к вопросу 10, когда будете готовы|Вопрос 9 полностью разобран. Переходим к вопросу 10, когда будете готовы]]
Навигация
- Предыдущий: 8
- Следующий: 10
- Индекс: 00. Индекс разборов