中文翻译暂不可用,显示俄语原文。
Что такое data contract между сервисами в RAG пайплайне?
Краткий тезис
Data contract — это формальное соглашение между producer (сервис индексации/ingestion) и consumer (сервис retrieval/поиска) о структуре, качестве и сроках доставки данных. В RAG-пайплайне такой контракт фиксирует схему чанков, метаданных, гарантии заполнения полей, SLO на свежесть и латентность, а также правила версионирования. Реализуется через schema registry (например, Schema Registry) и CI-валидацию, что предотвращает «тихие» поломки при изменениях в пайплайне.
1. Термин: Data contract
Data contract — это документ (часто в коде), который описывает:
- Формат данных (схема, типы полей, обязательность);
- Гарантии (поля всегда заполнены, допустимые диапазоны);
- SLO (Service Level Objectives) — целевые показатели производительности (latency, freshness, throughput);
- Версионирование — как обрабатываются breaking changes.
В контексте RAG пайплайн состоит из двух ключевых сервисов:
- Ingestion (producer) — разбивает документы на чанки, вычисляет эмбеддинги, сохраняет в векторную БД.
- Retrieval (consumer) — принимает запрос пользователя, ищет релевантные чанки, возвращает их LLM.
Без data contract изменения в ingestion (например, новое поле в метаданных) могут незаметно сломать retrieval, вызвав падение качества ответов или ошибки парсинга.
2. Проблемы, которые решает data contract
| Проблема | Описание | Решение через data contract |
|---|---|---|
| Несогласованность схем | Ingestion добавляет поле source_url, retrieval ожидает url | Фиксация точных имён и типов |
| Ломающие изменения | Удаление поля author → retrieval падает с KeyError | Версионирование, запрет breaking changes без новой версии |
| Отсутствие SLO | Ingestion обновляет данные раз в час, retrieval ожидает свежесть <5 мин | Явные SLO в контракте |
| «Тихие» ошибки | Поле chunk_text стало None для части документов → LLM получает пустой контекст | Гарантии (not null, non-empty) |
| Сложность отладки | Проблема проявляется только в production | CI-валидация контракта на этапе тестирования |
3. Компоненты data contract
3.1 Схема (Schema)
Определяет структуру данных, передаваемых между сервисами. Обычно используется JSON Schema, Apache Avro или Protocol Buffers (Protobuf).
Пример схемы чанка на JSON Schema:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"chunk_id": {"type": "string", "format": "uuid"},
"document_id": {"type": "string"},
"chunk_text": {"type": "string", "minLength": 1},
"embedding": {"type": "array", "items": {"type": "number"}, "minItems": 384},
"metadata": {
"type": "object",
"properties": {
"source_url": {"type": "string", "format": "uri"},
"page_number": {"type": "integer", "minimum": 1},
"created_at": {"type": "string", "format": "date-time"}
},
"required": ["source_url"]
}
},
"required": ["chunk_id", "document_id", "chunk_text", "embedding"]
}
3.2 Гарантии (Guarantees)
Дополнительные условия, не выражаемые схемой:
- Поле
chunk_textне пустое (длина > 0). embedding— массив фиксированной длины (например, 384 для all-MiniLM-L6-v2).- metadata.page_number присутствует только для PDF-документов, но если указан — должен быть положительным.
Гарантии можно проверять через Great Expectations или Deequ (для Spark).
3.3 SLO (Service Level Objectives)
Ключевые показатели для RAG-пайплайна:
| Метрика | Пример SLO | Почему важно |
|---|---|---|
| Freshness (свежесть) | Новые документы доступны для поиска не позднее 5 минут после загрузки | Пользователь ожидает актуальные ответы |
| Latency (задержка) | P99 времени индексации одного документа < 2 секунд | Не влияет на throughput ingestion |
| Throughput (пропускная способность) | Ingestion обрабатывает не менее 100 документов/сек | Соответствует пиковой нагрузке |
| Availability (доступность) | Retrieval возвращает результат в 99.9% запросов | Отказ retrieval = отказ всего RAG |
SLO фиксируются в контракте и мониторятся через SLI (фактические измерения). Нарушение SLO — триггер для алерта.
3.4 Версионирование
Контракт версионируется (semver: major.minor.patch). Правила:
- Breaking change (major): удаление поля, изменение типа, добавление нового required поля. Требует новой версии контракта и координации с consumer.
- Non-breaking change (minor): добавление optional поля, расширение enum.
- Patch: исправление описания, комментариев.
Consumer должен поддерживать backward compatibility: старый consumer может читать данные новой версии, если изменения non-breaking.
4. Форматы схем: сравнение
| Характеристика | JSON Schema | Apache Avro | Protocol Buffers |
|---|---|---|---|
| Типизация | Динамическая (runtime) | Статическая (compile-time) | Статическая (compile-time) |
| Размер сериализованных данных | Большой (JSON) | Компактный (бинарный) | Компактный (бинарный) |
| Поддержка эволюции схемы | Хорошая (optional поля) | Отличная (full backward/forward) | Хорошая (backward через reserved) |
| Schema Registry | Не требуется (можно валидировать на лету) | Confluent Schema Registry | Confluent, Apicurio |
| Читаемость | Человекочитаемый | Требует десериализации | Требует десериализации |
| Применение в RAG | REST API между сервисами | Kafka / streaming пайплайны | gRPC микросервисы |
Для RAG-пайплайна часто выбирают Avro (если используется Kafka для передачи чанков) или JSON Schema (если простой REST).
5. Версионирование и breaking changes
5.1 Типы совместимости
- Backward compatible: новый producer может писать данные, старый consumer — читать (добавление optional полей).
- Forward compatible: старый producer пишет, новый consumer читает (игнорирование неизвестных полей).
- Full compatible: и backward, и forward.
5.2 Стратегии управления breaking changes
- Двойная запись (dual write): producer пишет и старую, и новую версию, consumer постепенно мигрирует.
- Feature toggle: consumer переключается на новую версию по флагу.
- Parallel run: запуск двух версий ingestion, сравнение результатов retrieval.
Пример breaking change: удаление поля chunk_text и замена на chunk_content (текст + таблицы). Без контракта retrieval продолжит искать chunk_text и получит None.
6. Реализация: Schema Registry и CI
6.1 Schema Registry
Центральное хранилище схем с проверкой совместимости. Популярные решения:
- Confluent Schema Registry (для Avro, Protobuf, JSON Schema).
- Apicurio Registry (open-source, поддерживает多种 форматы).
- AWS Glue Schema Registry.
Producer регистрирует новую версию схемы, Registry проверяет совместимость с предыдущей (по настроенной политике). Если проверка не пройдена — producer отклоняется.
6.2 CI-валидация контракта
В CI/CD pipeline ingestion-сервиса добавляется шаг:
- Загрузить текущую схему из Registry.
- Сгенерировать тестовые данные по новой схеме.
- Проверить, что consumer может их прочитать (юнит-тест с mock retrieval).
- Если тест падает — сборка фейлится.
Пример на Python с fastavro и pytest:
import fastavro
import requests
def test_data_contract():
# Получаем схему из Registry
schema = requests.get("http://schema-registry:8081/subjects/chunk-value/versions/latest").json()
parsed_schema = fastavro.parse_schema(schema["schema"])
# Генерируем тестовые данные
record = {
"chunk_id": "test-uuid",
"document_id": "doc-123",
"chunk_text": "Sample text",
"embedding": [0.1] * 384,
"metadata": {"source_url": "http://example.com"}
}
# Сериализуем и десериализуем
bytes_ = fastavro.writer(io.BytesIO(), parsed_schema, [record])
reader = fastavro.reader(io.BytesIO(bytes_.read()))
assert next(reader) == record
7. Пример data contract для RAG пайплайна
Допустим, ingestion публикует чанки в топик Kafka rag-chunks. Data contract (Avro) может выглядеть так:
{
"type": "record",
"name": "Chunk",
"namespace": "com.example.rag",
"fields": [
{"name": "chunk_id", "type": "string"},
{"name": "document_id", "type": "string"},
{"name": "chunk_text", "type": "string"},
{"name": "embedding", "type": {"type": "array", "items": "float"}},
{"name": "metadata", "type": {
"type": "record",
"name": "ChunkMetadata",
"fields": [
{"name": "source_url", "type": "string"},
{"name": "page_number", "type": ["null", "int"], "default": null},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}},
{"name": "version", "type": "int", "default": 1}
]
}
Гарантии:
chunk_textне пустой (проверяется в producer).embeddingимеет длину 384 (валидация в CI).metadata.source_url— валидный URI.
SLO:
- Freshness: P99 latency от загрузки документа до появления в retrieval < 5 мин.
- Throughput: не менее 1000 чанков/сек.
8. Тестирование контрактов
- Unit-тесты: проверка сериализации/десериализации с разными вариантами данных.
- Integration-тесты: запуск ingestion → Kafka → retrieval в тестовом окружении, проверка, что retrieval корректно парсит чанки.
- Contract testing (Pact, Spring Cloud Contract): consumer-driven контракты, где retrieval (consumer) описывает ожидаемые данные, а ingestion (producer) должен их удовлетворять.
Пет-проект для закрепления
Задача: Реализовать data contract между ingestion и retrieval сервисами с использованием Confluent Schema Registry и CI-валидацией.
Инструменты:
- Python (fastavro, confluent-kafka)
- Docker Compose (Kafka + Schema Registry)
- GitHub Actions (CI)
Шаги:
- Разверните Kafka и Schema Registry через Docker Compose.
- Определите Avro-схему чанка (как в примере выше).
- Напишите producer (ingestion), который читает PDF, разбивает на чанки, сериализует в Avro и отправляет в топик.
- Напишите consumer (retrieval), который читает из топика, десериализует и сохраняет в векторную БД (FAISS).
- В CI добавьте шаг: при изменении схемы запускается тест, который проверяет, что consumer может прочитать данные по новой схеме.
- Смоделируйте breaking change (удалите поле
chunk_text) — убедитесь, что CI падает.
Ожидаемый результат: Работающий пайплайн, где любое изменение схемы, ломающее consumer, блокируется на этапе CI. Вы получите практическое понимание, как data contract предотвращает «тихие» поломки в RAG.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 530 | Как организован пайплайн индексации в RAG? |
| 531 | Что такое schema registry и зачем он нужен? |
| 533 | Как обеспечить consistency данных между ingestion и retrieval? |
| 534 | Какие метрики мониторинга RAG пайплайна вы знаете? |
| 540 | Как вы версионируете эмбеддинги и модели? |
Навигация
- Предыдущий: 531
- Следующий: 533
- Индекс: 00. Индекс разборов