Что такое data contract между сервисами в RAG пайплайне?

Краткий тезис

Data contract — это формальное соглашение между producer (сервис индексации/ingestion) и consumer (сервис retrieval/поиска) о структуре, качестве и сроках доставки данных. В RAG-пайплайне такой контракт фиксирует схему чанков, метаданных, гарантии заполнения полей, SLO на свежесть и латентность, а также правила версионирования. Реализуется через schema registry (например, Schema Registry) и CI-валидацию, что предотвращает «тихие» поломки при изменениях в пайплайне.


1. Термин: Data contract

Data contract — это документ (часто в коде), который описывает:

  • Формат данных (схема, типы полей, обязательность);
  • Гарантии (поля всегда заполнены, допустимые диапазоны);
  • SLO (Service Level Objectives) — целевые показатели производительности (latency, freshness, throughput);
  • Версионирование — как обрабатываются breaking changes.

В контексте RAG пайплайн состоит из двух ключевых сервисов:

  • Ingestion (producer) — разбивает документы на чанки, вычисляет эмбеддинги, сохраняет в векторную БД.
  • Retrieval (consumer) — принимает запрос пользователя, ищет релевантные чанки, возвращает их LLM.

Без data contract изменения в ingestion (например, новое поле в метаданных) могут незаметно сломать retrieval, вызвав падение качества ответов или ошибки парсинга.


2. Проблемы, которые решает data contract

ПроблемаОписаниеРешение через data contract
Несогласованность схемIngestion добавляет поле source_url, retrieval ожидает urlФиксация точных имён и типов
Ломающие измененияУдаление поля authorretrieval падает с KeyErrorВерсионирование, запрет breaking changes без новой версии
Отсутствие SLOIngestion обновляет данные раз в час, retrieval ожидает свежесть <5 минЯвные SLO в контракте
«Тихие» ошибкиПоле chunk_text стало None для части документов → LLM получает пустой контекстГарантии (not null, non-empty)
Сложность отладкиПроблема проявляется только в productionCI-валидация контракта на этапе тестирования

3. Компоненты data contract

3.1 Схема (Schema)

Определяет структуру данных, передаваемых между сервисами. Обычно используется JSON Schema, Apache Avro или Protocol Buffers (Protobuf).

Пример схемы чанка на JSON Schema:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "chunk_id": {"type": "string", "format": "uuid"},
    "document_id": {"type": "string"},
    "chunk_text": {"type": "string", "minLength": 1},
    "embedding": {"type": "array", "items": {"type": "number"}, "minItems": 384},
    "metadata": {
      "type": "object",
      "properties": {
        "source_url": {"type": "string", "format": "uri"},
        "page_number": {"type": "integer", "minimum": 1},
        "created_at": {"type": "string", "format": "date-time"}
      },
      "required": ["source_url"]
    }
  },
  "required": ["chunk_id", "document_id", "chunk_text", "embedding"]
}

3.2 Гарантии (Guarantees)

Дополнительные условия, не выражаемые схемой:

  • Поле chunk_text не пустое (длина > 0).
  • embedding — массив фиксированной длины (например, 384 для all-MiniLM-L6-v2).
  • metadata.page_number присутствует только для PDF-документов, но если указан — должен быть положительным.

Гарантии можно проверять через Great Expectations или Deequ (для Spark).

3.3 SLO (Service Level Objectives)

Ключевые показатели для RAG-пайплайна:

МетрикаПример SLOПочему важно
Freshness (свежесть)Новые документы доступны для поиска не позднее 5 минут после загрузкиПользователь ожидает актуальные ответы
Latency (задержка)P99 времени индексации одного документа < 2 секундНе влияет на throughput ingestion
Throughput (пропускная способность)Ingestion обрабатывает не менее 100 документов/секСоответствует пиковой нагрузке
Availability (доступность)Retrieval возвращает результат в 99.9% запросовОтказ retrieval = отказ всего RAG

SLO фиксируются в контракте и мониторятся через SLI (фактические измерения). Нарушение SLO — триггер для алерта.

3.4 Версионирование

Контракт версионируется (semver: major.minor.patch). Правила:

  • Breaking change (major): удаление поля, изменение типа, добавление нового required поля. Требует новой версии контракта и координации с consumer.
  • Non-breaking change (minor): добавление optional поля, расширение enum.
  • Patch: исправление описания, комментариев.

Consumer должен поддерживать backward compatibility: старый consumer может читать данные новой версии, если изменения non-breaking.


4. Форматы схем: сравнение

ХарактеристикаJSON SchemaApache AvroProtocol Buffers
ТипизацияДинамическая (runtime)Статическая (compile-time)Статическая (compile-time)
Размер сериализованных данныхБольшой (JSON)Компактный (бинарный)Компактный (бинарный)
Поддержка эволюции схемыХорошая (optional поля)Отличная (full backward/forward)Хорошая (backward через reserved)
Schema RegistryНе требуется (можно валидировать на лету)Confluent Schema RegistryConfluent, Apicurio
ЧитаемостьЧеловекочитаемыйТребует десериализацииТребует десериализации
Применение в RAGREST API между сервисамиKafka / streaming пайплайныgRPC микросервисы

Для RAG-пайплайна часто выбирают Avro (если используется Kafka для передачи чанков) или JSON Schema (если простой REST).


5. Версионирование и breaking changes

5.1 Типы совместимости

  • Backward compatible: новый producer может писать данные, старый consumer — читать (добавление optional полей).
  • Forward compatible: старый producer пишет, новый consumer читает (игнорирование неизвестных полей).
  • Full compatible: и backward, и forward.

5.2 Стратегии управления breaking changes

  1. Двойная запись (dual write): producer пишет и старую, и новую версию, consumer постепенно мигрирует.
  2. Feature toggle: consumer переключается на новую версию по флагу.
  3. Parallel run: запуск двух версий ingestion, сравнение результатов retrieval.

Пример breaking change: удаление поля chunk_text и замена на chunk_content (текст + таблицы). Без контракта retrieval продолжит искать chunk_text и получит None.


6. Реализация: Schema Registry и CI

6.1 Schema Registry

Центральное хранилище схем с проверкой совместимости. Популярные решения:

Producer регистрирует новую версию схемы, Registry проверяет совместимость с предыдущей (по настроенной политике). Если проверка не пройдена — producer отклоняется.

6.2 CI-валидация контракта

В CI/CD pipeline ingestion-сервиса добавляется шаг:

  1. Загрузить текущую схему из Registry.
  2. Сгенерировать тестовые данные по новой схеме.
  3. Проверить, что consumer может их прочитать (юнит-тест с mock retrieval).
  4. Если тест падает — сборка фейлится.

Пример на Python с fastavro и pytest:

import fastavro
import requests

def test_data_contract():
    # Получаем схему из Registry
    schema = requests.get("http://schema-registry:8081/subjects/chunk-value/versions/latest").json()
    parsed_schema = fastavro.parse_schema(schema["schema"])
    
    # Генерируем тестовые данные
    record = {
        "chunk_id": "test-uuid",
        "document_id": "doc-123",
        "chunk_text": "Sample text",
        "embedding": [0.1] * 384,
        "metadata": {"source_url": "http://example.com"}
    }
    
    # Сериализуем и десериализуем
    bytes_ = fastavro.writer(io.BytesIO(), parsed_schema, [record])
    reader = fastavro.reader(io.BytesIO(bytes_.read()))
    assert next(reader) == record

7. Пример data contract для RAG пайплайна

Допустим, ingestion публикует чанки в топик Kafka rag-chunks. Data contract (Avro) может выглядеть так:

{
  "type": "record",
  "name": "Chunk",
  "namespace": "com.example.rag",
  "fields": [
    {"name": "chunk_id", "type": "string"},
    {"name": "document_id", "type": "string"},
    {"name": "chunk_text", "type": "string"},
    {"name": "embedding", "type": {"type": "array", "items": "float"}},
    {"name": "metadata", "type": {
      "type": "record",
      "name": "ChunkMetadata",
      "fields": [
        {"name": "source_url", "type": "string"},
        {"name": "page_number", "type": ["null", "int"], "default": null},
        {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
      ]
    }},
    {"name": "version", "type": "int", "default": 1}
  ]
}

Гарантии:

  • chunk_text не пустой (проверяется в producer).
  • embedding имеет длину 384 (валидация в CI).
  • metadata.source_url — валидный URI.

SLO:

  • Freshness: P99 latency от загрузки документа до появления в retrieval < 5 мин.
  • Throughput: не менее 1000 чанков/сек.

8. Тестирование контрактов

  • Unit-тесты: проверка сериализации/десериализации с разными вариантами данных.
  • Integration-тесты: запуск ingestionKafkaretrieval в тестовом окружении, проверка, что retrieval корректно парсит чанки.
  • Contract testing (Pact, Spring Cloud Contract): consumer-driven контракты, где retrieval (consumer) описывает ожидаемые данные, а ingestion (producer) должен их удовлетворять.

Пет-проект для закрепления

Задача: Реализовать data contract между ingestion и retrieval сервисами с использованием Confluent Schema Registry и CI-валидацией.

Инструменты:

Шаги:

  1. Разверните Kafka и Schema Registry через Docker Compose.
  2. Определите Avro-схему чанка (как в примере выше).
  3. Напишите producer (ingestion), который читает PDF, разбивает на чанки, сериализует в Avro и отправляет в топик.
  4. Напишите consumer (retrieval), который читает из топика, десериализует и сохраняет в векторную БД (FAISS).
  5. В CI добавьте шаг: при изменении схемы запускается тест, который проверяет, что consumer может прочитать данные по новой схеме.
  6. Смоделируйте breaking change (удалите поле chunk_text) — убедитесь, что CI падает.

Ожидаемый результат: Работающий пайплайн, где любое изменение схемы, ломающее consumer, блокируется на этапе CI. Вы получите практическое понимание, как data contract предотвращает «тихие» поломки в RAG.


Связь с другими вопросами

ВопросТема
530Как организован пайплайн индексации в RAG?
531Что такое schema registry и зачем он нужен?
533Как обеспечить consistency данных между ingestion и retrieval?
534Какие метрики мониторинга RAG пайплайна вы знаете?
540Как вы версионируете эмбеддинги и модели?

Навигация