English translation is not available yet. Showing Russian content.

Как обрабатывать schema drift в данных для RAG?

Краткий тезис

Schema drift — это изменение структуры документов (добавление/удаление полей, изменение типов данных), которое может нарушить работу RAG-пайплайна: сломаться индексация, эмбеддинги перестанут соответствовать, retrieval вернёт некорректные чанки. Обработка сводится к трём этапам: обнаружение изменений (Schema Registry, валидация), применение стратегии совместимости (forward/backward/full) и автоматическая миграция (переиндексация, versioning документов). В контексте агентного RAG schema drift требует адаптивного поведения — агент может перезапускать индексацию или динамически переключать схемы.


1. Термин: Schema drift и его угрозы для RAG

Schema drift (дрейф схемы) — это любое изменение формата, структуры или типов данных в исходных документах, которые система использует для построения индекса RAG. Причины: обновление API, миграция на новую версию БД, изменение бизнес-требований (например, добавлено поле region).

Почему это критично для RAG

  • Индексация: если код ожидает поля title и content, а в новом документе поле content переименовано в text, то индексатор запишет пустые значения в векторную БД.
  • Эмбеддинги: модель эмбеддинга обучена на определённом наборе полей; добавление нового поля («region», «timestamp») может либо игнорироваться (потеря информации), либо приводить к смещению векторов.
  • Retrieval: поиск по метаданным (фильтрация) сломается, если поле category стало списком вместо строки.
  • Генерация: LLM получит контекст с пропущенными или неверно типизированными данными, что ухудшит качество ответа.

Пример сценария Изначальный документ:

{
  "title": "RAG tutorial",
  "content": "Text...",
  "date": "2024-01-01"
}

После обновления API:

{
  "title": "RAG tutorial",
  "content": "Text...",
  "region": "EU",
  "date": "2024-01-01T12:00:00Z"
}

Поле region новое, а date изменило тип со строки на ISO datetime — это и есть schema drift.


2. Обнаружение schema drift

2.1 Schema Registry (Avro/Protobuf/JSON Schema)

Центральное хранилище определений схемы (например, Confluent Schema Registry для Kafka, или собственный registry на основе JSON Schema). Каждый документ при индексации проверяется на соответствие зарегистрированной схеме.

Процесс

  1. Разработчик регистрирует схему v1 с полями title (string), content (string), date (string).
  2. При поступлении документа проверяется его совпадение со схемой.
  3. Если документ имеет новое поле region — регистрируется v2 схемы с оценкой совместимости.

2.2 Инструменты валидации

  • Great Expectations: библиотека для data quality. Можно определить ожидания (expectations) на структуру документа, например expect_column_to_exist("region").
  • Pydantic: валидация схем в Python при загрузке документа.
  • Apache Avro/Parquet: форматы со встроенными схемами, позволяют проверять типы.

2.3 Мониторинг при индексации

Логирование ошибок валидации. Например, если в логах появляются KeyError: 'content' или TypeError: expected str, got int, это сигнал schema drift.

2.4 Проактивное обнаружение

  • Сравнение гистограмм типов: система периодически сканирует новые документы и сравнивает доли полей с эталоном.
  • Схема из образца: если ожидается, что поле region появится в 20% документов, можно настроить алерт при превышении порога.

3. Стратегии совместимости (Compatibility modes)

Когда schema drift обнаружен, нужно решить, могут ли старые readers (код обработки) работать с новыми документами, и наоборот. Основные режимы:

РежимОписаниеПример
Forward compatibilityНовые документы добавляют поля, но старый читатель может их игнорировать (поля optional).Добавлено region; старый парсер читает только title и content — работает.
Backward compatibilityСтарые документы всё ещё могут быть прочитаны новым читателем (удалённые поля имеют default).Убрали date; новый парсер использует default "unknown" — работает.
Full compatibilityОдновременно forward и backward: и старые читатели работают с новыми документами, и новые со старыми.Поля только добавляются как optional, никогда не удаляются, типы не меняются.

Какой выбрать для RAG Обычно forward compatibility — самый безопасный: новые поля появляются как optional, старый индексатор их игнорирует, а новый может решить, стоит ли их включать в эмбеддинг. Full compatibility — идеал, но трудно достижим, если типы меняются.


4. Действия после обнаружения: миграция индекса

4.1 Версионирование документов

Хранить в метаданных номер версии схемы (например, _schema_version: 1). При изменении схемы новые документы получают _schema_version: 2. Векторная БД может содержать документы разных версий — это нормально, если поля совместимы.

4.2 Переиндексация

Если drift несовместим (например, поле content разделилось на abstract и body), необходимо:

  1. Написать код миграции (пример ниже).
  2. Пересоздать эмбеддинги для всех затронутых чанков.
  3. Заменить старые записи в векторной БД (через upsert по ID).
# Пример миграции для несовместимого schema drift
def migrate_document(old_doc: dict) -> dict:
    if old_doc.get("_schema_version") == 1:
        return {
            "_schema_version": 2,
            "title": old_doc["title"],
            "abstract": old_doc["content"][:100],  # грубое разделение
            "body": old_doc["content"]
        }
    return old_doc

4.3 Автоматизация через pipeline

В агентном RAG можно поручить агенту-индексатору отслеживать schema registry и при обнаружении новой версии запускать переиндексацию. Агент также может временно переключить retrieval на старую схему, пока миграция не завершена.


5. Инструменты и инфраструктура

ИнструментРоль в обработке schema drift
Confluent Schema RegistryЦентрализованное хранение схем, проверка эволюции (forward/backward).
Great ExpectationsData quality validation; можно настроить "expectations" на схему прямо перед индексацией.
Apache Avro / ProtobufСериализация с явной схемой; поддерживают эволюцию.
dlt (data load tool)Библиотека для ETL, автоматически определяет schema drift и предлагает варианты разрешения.
Custom Python (Pydantic)Валидация на этапе загрузки документа.
Weaviate / QdrantНекоторые векторные БД поддерживают мульти-тенантность с разными схемами.

6. Schema drift в контексте Agentic RAG

В агентном RAG (когда система использует планировщиков, инструменты и самокоррекцию) обработка schema drift становится более гибкой:

  • Агент-индексатор регулярно проверяет schema registry и, если зарегистрирована новая версия, запускает процесс миграции.
  • Агент-ретривер может получать метаданные о версии схемы документа и фильтровать только те версии, которые совместимы с текущим запросом.
  • Агент-генератор может запросить у пользователя уточнение, если поле, которое он ожидает, отсутствует.
  • Self-healing pipeline: если во время индексации происходит ошибка из-за drift, агент переключается на fallback-схему (например, игнорирует новое поле) и логирует проблему.

Пример: документ с новым полем language. Агент решает, стоит ли включать это поле в эмбеддинг (если система мультиязычная) или игнорировать. Решение может зависеть от контекста: если запрос на русском, а поле language содержит "en", можно понизить вес документа.


7. Практические рекомендации

  1. Закладывайте optional-поля с самого начала. Даже если сейчас кажется, что поле не нужно, сделайте его как Optional[str].
  2. Используйте версионирование как для документов, так и для эмбеддингов (например, добавляйте хеш версии в ID чанка).
  3. Настройте мониторинг: если во время индексации растёт число ошибок валидации, это первый признак schema drift.
  4. Тестируйте совместимость на тестовой коллекции перед деплоем новой схемы.
  5. Документируйте схему в Schema Registry — это поможет и разработчикам, и агентам.
  6. Рассмотрите асинхронную миграцию: пока новое поле индексируется, старый поиск продолжает работать со старыми данными.

8. Схема принятия решений при schema drift

graph TD
    A[Обнаружение schema drift] --> B{Совместимость forward?}
    B -- Да --> C[Игнорировать новое поле или добавить в эмбеддинг по желанию]
    B -- Нет --> D{Совместимость backward?}
    D -- Да --> E[Добавить default для отсутствующих полей, обновить код читателя]
    D -- Нет --> F[Необходима миграция]
    F --> G[Написать трансформацию, переиндексировать затронутые документы]
    F --> H[Зарегистрировать новую схему v2 в Schema Registry]
    G --> I[Обновить индексы в векторной БД (upsert)]
    I --> J[Проверить, что retrieval работает корректно]

Пет-проект для закрепления

Задача: Разработать симулятор schema drift для маленького RAG-пайплайна (10 документов). Пусть документы приходят в формате JSON, и один раз в 10 секунд система "получает" документ с изменённой структурой (новое поле или изменён тип).

Инструменты: Python, LangChain (для RAG), ChromaDB (векторная БД), Great Expectations (вариант: простая валидация с Pydantic).

Шаги:

  1. Реализуйте класс DocumentSchemaValidator, который принимает список ожидаемых полей с типами.
  2. При поступлении документа проверяйте его через валидатор. Если поле отсутствует или тип неверный — логируйте SchemaDriftWarning.
  3. В случае drift вызывайте функцию migrate_document, которая добавляет default-значения или переименовывает поля.
  4. После миграции пересоздавайте эмбеддинг и делайте upsert в ChromaDB.
  5. Напишите простой тест: отправьте запрос, который должен найти документ с новым полем. Убедитесь, что retrieval работает.

Ожидаемый результат: консольное приложение, которое выводит логи обнаружения drift и сообщения об успешной миграции. Можно добавить метрику: % документов, прошедших валидацию без изменений.


Связь с другими вопросами

ВопросТема
1Проектирование RAG-системы для разнородных данных
3Стратегии chunking и влияние на индексацию
5Оценка качества retrieval (влияние schema drift на метрики)
9Обновление документов в существующей RAG-системе
10Self-RAG и адаптивные пайплайны

Навигация