Как проектировать CDC (Change Data Capture) для документов?

Краткий тезис

CDC (Change Data Capture) — это паттерн, который позволяет отслеживать и реагировать на изменения в источнике данных в реальном времени. Для RAG-системы CDC критичен — без него документы в векторной БД быстро устаревают, а пользователи получают неактуальные ответы. Проектирование CDC для документов включает выбор подходящего коннектора (Debezium, AWS DMS, Kafka Connect), настройку пайплайна (sourceCDC stream → consumer → vector DB), обработку событий INSERT, UPDATE, DELETE и обеспечение согласованности данных.


1. Термин: Change Data Capture (CDC)

CDC — это технология, которая фиксирует изменения в данных на уровне источника (базы данных, файлового хранилища, SharePoint) и передаёт их потребителям в виде потока событий. В отличие от полного периодического снимка (full snapshot), CDC передаёт только дельту (изменения), что делает его эффективным для систем реального времени.

Зачем CDC для RAG

  • Документы добавляются, обновляются, удаляются.
  • Векторная БД должна отражать эти изменения, иначе freshness (свежесть) данных падает.
  • Полная переиндексация каждого документа каждый раз дорога и медленна.
  • CDC позволяет обновлять только изменённые чанки, сокращая latency и стоимость.

Ключевые компоненты CDC-пайплайна


2. Основные источники документов и методы CDC

ИсточникТип данныхМеханизм CDCИнструменты
Реляционные БД (PostgreSQL, MySQL)Таблицы с метаданными и текстамиЛогическая репликация (WAL), триггеры, pollingDebezium (слоты репликации), Kafka JDBC Source (polling)
Объектные хранилища (S3, GCS)Файлы (PDF, DOCX, Markdown)Events (S3 Event Notifications → SQS/SNS)AWS Lambda, S3 Event Notifications
SharePoint / OneDriveДокументы на платформеWebhooks, REST API delta-запросыMicrosoft Graph API, Azure Functions
NoSQL (MongoDB)Документы JSONOplog (MongoDB replication log)Debezium MongoDB connector
API-источники (Confluence, Notion)Вики-страницыWebhooks / periodic pollingПользовательские консьюмеры (Airbyte, Fivetran)

Выбор механизма CDC зависит от

  • Требуемой задержки (real-time vs near-real-time).
  • Сложности инфраструктуры (Kafka vs serverless).
  • Частоты изменений (high write throughput может перегрузить коннектор).

3. Архитектура CDC-пайплайна для документов

Типовая архитектура на основе Debezium и Kafka:

PostgreSQL (таблица documents)
    │
    ▼
Debezium (PostgreSQL connector) — читает WAL
    │
    ▼
Kafka topic: "documents.cdc"
    │
    ▼
Python Consumer (группа consumers)
    │
    ▼
Vector DB (Pinecone / Weaviate / Qdrant)

Детали шагов

  1. PostgreSQL — включает таблицу documents с полями: id, title, content, status, updated_at.
  2. Debezium — подключается через логический слот репликации, читает WAL (Write-Ahead Log), парсит изменения и отправляет их в Kafka.
  3. Kafka topic — хранит сообщения в формате Avro или JSON. Каждое сообщение содержит ключ (ID документа) и значение (старое/новое состояние).
  4. Consumer — читает топик, для каждого события:
    • INSERT: разбивает content на чанки, вычисляет эмбеддинги, записывает в векторную БД.
    • UPDATE: удаляет старые чанки (по id), затем вставляет новые (или использует операцию upsert, если поддерживает).
    • DELETE: удаляет все чанки с соответствующим parent_id из векторной БД.
  5. Vector DB — обновляется инкрементально.

Важно Consumer должен быть идемпотентным (повторная обработка одного события не нарушает целостность). Для этого используем offset management и deduplication по ID внутри одной партиции.


4. Конфигурация Debezium для PostgreSQL

Пример JSON-конфигурации коннектора (через REST API Kafka Connect):

{
  "name": "documents-connector",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "debezium",
  "database.password": "dbz",
  "database.dbname": "documents_db",
  "database.server.name": "documents-server",
  "table.include.list": "public.documents",
  "plugin.name": "pgoutput",
  "snapshot.mode": "initial",
  "decimal.handling.mode": "double",
  "tombstones.on.delete": "false",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

Пояснения

  • plugin.name: pgoutput (используется с PostgreSQL 10+), wal2json, decoderbufs.
  • snapshot.mode: initial — при старте делает полный снимок, затем переключается на изменения.
  • tombstones.on.delete: если false, Debezium не шлёт tombstone-сообщения (нужно для компакции, но усложняет consumer).
  • transforms: ExtractNewRecordState — убирает вложенность (было before и after, остаётся только after).

Обработка схемы Для каждого события Debezium включает schema registry (Avro) или встроенную JSON-схему. Consumer должен уметь десериализовать.


5. События: INSERT, UPDATE, DELETE и трансформация в чанки

Consumer получает сообщение вида:

{
  "schema": { ... },
  "payload": {
    "op": "c",   // "c" = create, "u" = update, "d" = delete
    "before": { "id": 123, "title": "старый title", "content": "старый content" },  // null для create
    "after": { "id": 123, "title": "новый title", "content": "новый content" }
  }
}

Логика обработки для каждого типа события:

  • INSERT (op = "c"):

    1. Получаем after.
    2. Разбиваем content на чанки (по стратегии — фиксированная длина с перекрытием, semantic chunking, recursive splitter).
    3. Для каждого чанка: генерируем эмбеддинг, добавляем метаданные (document_id, title, chunk_index).
    4. Записываем в векторную БД (batch insert).
  • UPDATE (op = "u"):

    1. Удаляем все существующие чанки, чьи document_id равен after.id.
    2. Разбиваем новый content на чанки, эмбеддинг, запись.
    3. Вариант 2 (если поддерживается upsert): использовать операцию upsert по составному ключу (document_id + chunk_index). Так дешевле, но нужно знать, какие чанки были удалены.
  • DELETE (op = "d"):

    1. По before.id находим и удаляем все чанки из векторной БД.
    2. Дополнительно можно удалить метаданные из кэша или реляционной БД.

Чанкинг должен быть воспроизводим при одном и том же content алгоритм должен выдавать идентичные чанки. Иначе при переиндексации одного документа без изменений мы получим дубликаты.


6. Интеграция с векторной БД

Типовые операции:

  • Pinecone: index.upsert(vectors=[...]) для create/update, index.delete(ids=[...]) для delete.
  • Weaviate: batch import через client.batch.configure().
  • Qdrant: upsert с помощью collection.upload_records().
  • Chroma: collection.add() и collection.delete().

Транзакционность часто векторная БД не поддерживает атомарные операции по нескольким записям. Если consumer падает после частичного обновления, мы можем получить неконсистентное состояние. Решения:

  • Использовать exactly-once семантику в consumer (через Kafka transactions + idempotent storage).
  • Хранить в отдельной таблице статус последней успешной обработки события (last_processed_offset).
  • Периодически делать полную выверку (reconciliation): сравнить все документы из source с векторной БД и исправить расхождения.

7. Проблемы и компромиссы при проектировании CDC

ПроблемаОписаниеРешение
Ordering (упорядочивание)События по одному документу должны обрабатываться в порядке их возникновения. Kafka гарантирует порядок внутри партиции по ключу.Использовать ID документа как ключ сообщения.
Deduplication (дедупликация)При сетевых сбоях или ребалансе consumer может обработать одно событие дважды.Идемпотентные операции (upsert). Сделать consumer идемпотентным с помощью таблицы processed_events.
Latency (задержка)Debezium добавляет задержку в миллисекунды, Kafka — наносекунды, consumer и эмбеддинг — основное узкое место.Использовать batch processing для эмбеддинга, параллельные consumer-инстансы.
Snapshot vs streamingПри старте или перезапуске нужно сначала сделать snapshot существующих документов, затем переключиться на стриминг.Debezium сам это делает (snapshot.mode). Consumer должен различать snapshot и streaming: snapshot не генерирует событий удаления, поэтому нужно очистить векторную БД перед snapshot.
Backpressure (обратное давление)Если consumer не справляется, Kafka может накопить много сообщений.Настроить retention в Kafka, мониторить lag (kafka-consumer-groups), масштабировать consumer.
Schema evolution (эволюция схемы)Добавление полей в таблицу documents.Использовать schema registry, backward compatible; consumer игнорирует новые поля, если не нужны.

8. Альтернативные инструменты и сценарии без CDC

Когда CDC избыточен или невозможен

  • Источник не поддерживает CDC (например, сторонний API).
  • Объём изменений мал (несколько документов в день).
  • Нет инфраструктуры (Kafka, Debezium).

Тогда можно использовать

  • Polling (опрос по расписанию): каждые N минут проверять updated_at и забирать документы, изменённые с последнего опроса.
  • Webhooks (входящие уведомления): если источник (SharePoint, Confluence) умеет отправлять HTTP-вызовы при изменении документа.
  • Full resync (полная переиндексация): раз в сутки перестраивать весь индекс. Дешево в реализации, но дорого по ресурсам и latency.

Сравнение подходов

КритерийCDC (Debezium)PollingFull resync
Задержка (freshness)МиллисекундыСекунды–минутыЧасы
Нагрузка на источникНизкая (логическая репликация)Средняя (SELECT с условием)Высокая (полный scan)
Сложность инфраструктурыВысокая (Kafka, Connect, CDC engine)НизкаяНизкая
НадёжностьВысокая (точно-один раз, порядок)Средняя (пропуски возможны)Средняя (дубли при прерывании)
Подходит для real-timeДаНетНет

9. Связь с Agentic RAG: агент как потребитель изменений

В архитектуре Agentic RAG агент может выступать в роли продвинутого потребителя CDC-потока. Агент не просто обновляет векторную БД, а принимает решения:

  • Приоритет изменений если документ содержит критические данные (например, обновление политики), агент может немедленно переиндексировать и уведомить других агентов.
  • Проверка целостности агент может сравнить сгенерированный эмбеддинг для изменённого документа с предыдущим, и если изменение незначительно — не обновлять.
  • Ручной override агент может временно приостановить CDC для определённых документов (например, при массовом обновлении).

Пример: AgenticRAG CDC Agent подписан на топик documents.cdc, получает событие UPDATE, отправляет запрос к LLM: «Значительно ли изменился контент? Если да — обновить индекс». Это экономит вычислительные ресурсы.


10. Мониторинг и надежность CDC-пайплайна

Метрики для отслеживания

  • Kafka consumer lag — разница между последним сообщением в топике и последним обработанным.
  • Event processing latency — время от фиксации в источнике до обновления векторной БД.
  • Throughput — количество обработанных событий в секунду.
  • Error rate — доля событий, которые не удалось применить (например, chunking не удался).
  • Consistency check — доля документов, для которых совпадает количество чанков в source и vector DB.

Инструменты Prometheus + Grafana для сбора метрик, Alertmanager для уведомлений при росте lag.

Рекомендации

  • Настроить мониторинг lag для каждой партиции.
  • При превышении порога (например, >1000 событий) автоматически донастроить parallelism consumer.
  • Использовать dead letter queue (DLQ) для событий, которые невозможно обработать (например, невалидный JSON в content).

Пет-проект для закрепления

Задача Реализовать минимальный CDC-пайплайн для документов на PostgreSQL с обновлением Chroma (in-memory vector DB).

Инструменты

Шаги:

  1. Поднимите инфраструктуру
    Создайте docker-compose.yml с сервисами: postgres, zookeeper, kafka, debezium-connect. Используйте образ debezium/connect:2.5.

  2. Создайте таблицу документов

    CREATE TABLE documents (
        id SERIAL PRIMARY KEY,
        title TEXT NOT NULL,
        content TEXT NOT NULL,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    
  3. Настройте Debezium connector через REST API:
    Отправьте POST на http://localhost:8083/connectors с JSON из раздела 4.

  4. Напишите Python consumer

    • Подключитесь к Kafka (bootstrap_servers).
    • Читайте топик documents-server.public.documents.
    • Для каждого сообщения: распарсите op, вызовите метод apply_event(event).
  5. Реализуйте логику обработки

    • INSERT: разбейте content на чанки по 200 токенов (overlap 20), сгенерируйте эмбеддинги (можно использовать sentence-transformers/all-MiniLM-L6-v2), добавьте в Chroma.
    • UPDATE: удалите все чанки с метаданными {"document_id": <id>}, затем вставьте новые.
    • DELETE: удалите все чанки с соответствующим document_id.
  6. Проверьте работу

    • Вставьте строку в PostgreSQL: INSERT INTO documents (title, content) VALUES ('Test', 'Hello world!');
    • Проверьте, что в Chroma появился чанк.
    • Обновите строку: UPDATE documents SET content='Hello world updated!' WHERE id=1;
    • Убедитесь, что чанк обновлён.
    • Удалите строку: DELETE FROM documents WHERE id=1;
    • Проверьте, что чанк удалён.
  7. Добавьте мониторинг
    Выведите lag consumer’а и количество обработанных событий в stdout.

Ожидаемый результат
Работающий пайплайн, при изменениях в PostgreSQL векторная БД синхронизируется с задержкой менее 1 секунды.


Связь с другими вопросами

ВопросТемаСвязь
850Архитектура Agentic RAGCDC — часть инфраструктуры для свежести данных в Agentic RAG
852Индексация документовCDC влияет на стратегию индексации (инкрементальная vs полная)
854Управление версиями документовCDC позволяет отслеживать версии и откаты
856Обработка конфликтов в данныхПри CDC нужно решать конфликты (параллельные обновления)
857Мониторинг RAG-системыМониторинг CDC — часть общей observability
860Пайплайны данных для AICDC — один из этапов data pipeline

Навигация