Как проектировать CDC (Change Data Capture) для документов?
Краткий тезис
CDC (Change Data Capture) — это паттерн, который позволяет отслеживать и реагировать на изменения в источнике данных в реальном времени. Для RAG-системы CDC критичен — без него документы в векторной БД быстро устаревают, а пользователи получают неактуальные ответы. Проектирование CDC для документов включает выбор подходящего коннектора (Debezium, AWS DMS, Kafka Connect), настройку пайплайна (source → CDC stream → consumer → vector DB), обработку событий INSERT, UPDATE, DELETE и обеспечение согласованности данных.
1. Термин: Change Data Capture (CDC)
CDC — это технология, которая фиксирует изменения в данных на уровне источника (базы данных, файлового хранилища, SharePoint) и передаёт их потребителям в виде потока событий. В отличие от полного периодического снимка (full snapshot), CDC передаёт только дельту (изменения), что делает его эффективным для систем реального времени.
Зачем CDC для RAG
- Документы добавляются, обновляются, удаляются.
- Векторная БД должна отражать эти изменения, иначе freshness (свежесть) данных падает.
- Полная переиндексация каждого документа каждый раз дорога и медленна.
- CDC позволяет обновлять только изменённые чанки, сокращая latency и стоимость.
Ключевые компоненты CDC-пайплайна
- Source (источник): PostgreSQL, MySQL, MongoDB, S3, SharePoint.
- Capture engine (движок захвата): Debezium, AWS Database Migration Service (DMS), Kafka Connect, Airbyte.
- Message broker (брокер сообщений): Apache Kafka, Amazon Kinesis, RabbitMQ.
- Consumer (потребитель): сервис на Python, который читает поток, преобразует события в операции над векторной БД.
- Target (цель): векторная БД (Pinecone, Weaviate, Qdrant, Chroma), поисковый индекс (Elasticsearch).
2. Основные источники документов и методы CDC
| Источник | Тип данных | Механизм CDC | Инструменты |
|---|---|---|---|
| Реляционные БД (PostgreSQL, MySQL) | Таблицы с метаданными и текстами | Логическая репликация (WAL), триггеры, polling | Debezium (слоты репликации), Kafka JDBC Source (polling) |
| Объектные хранилища (S3, GCS) | Файлы (PDF, DOCX, Markdown) | Events (S3 Event Notifications → SQS/SNS) | AWS Lambda, S3 Event Notifications |
| SharePoint / OneDrive | Документы на платформе | Webhooks, REST API delta-запросы | Microsoft Graph API, Azure Functions |
| NoSQL (MongoDB) | Документы JSON | Oplog (MongoDB replication log) | Debezium MongoDB connector |
| API-источники (Confluence, Notion) | Вики-страницы | Webhooks / periodic polling | Пользовательские консьюмеры (Airbyte, Fivetran) |
Выбор механизма CDC зависит от
- Требуемой задержки (real-time vs near-real-time).
- Сложности инфраструктуры (Kafka vs serverless).
- Частоты изменений (high write throughput может перегрузить коннектор).
3. Архитектура CDC-пайплайна для документов
Типовая архитектура на основе Debezium и Kafka:
PostgreSQL (таблица documents)
│
▼
Debezium (PostgreSQL connector) — читает WAL
│
▼
Kafka topic: "documents.cdc"
│
▼
Python Consumer (группа consumers)
│
▼
Vector DB (Pinecone / Weaviate / Qdrant)
Детали шагов
- PostgreSQL — включает таблицу documents с полями:
id, title,content, status,updated_at. - Debezium — подключается через логический слот репликации, читает WAL (Write-Ahead Log), парсит изменения и отправляет их в Kafka.
- Kafka topic — хранит сообщения в формате Avro или JSON. Каждое сообщение содержит ключ (ID документа) и значение (старое/новое состояние).
- Consumer — читает топик, для каждого события:
- INSERT: разбивает
contentна чанки, вычисляет эмбеддинги, записывает в векторную БД. - UPDATE: удаляет старые чанки (по
id), затем вставляет новые (или использует операцию upsert, если поддерживает). - DELETE: удаляет все чанки с соответствующим
parent_idиз векторной БД.
- INSERT: разбивает
- Vector DB — обновляется инкрементально.
Важно Consumer должен быть идемпотентным (повторная обработка одного события не нарушает целостность). Для этого используем offset management и deduplication по ID внутри одной партиции.
4. Конфигурация Debezium для PostgreSQL
Пример JSON-конфигурации коннектора (через REST API Kafka Connect):
{
"name": "documents-connector",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "documents_db",
"database.server.name": "documents-server",
"table.include.list": "public.documents",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"decimal.handling.mode": "double",
"tombstones.on.delete": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
Пояснения
- plugin.name: pgoutput (используется с PostgreSQL 10+), wal2json, decoderbufs.
- snapshot.mode:
initial— при старте делает полный снимок, затем переключается на изменения. tombstones.on.delete: еслиfalse, Debezium не шлёт tombstone-сообщения (нужно для компакции, но усложняет consumer).transforms: ExtractNewRecordState — убирает вложенность (былоbeforeиafter, остаётся толькоafter).
Обработка схемы Для каждого события Debezium включает schema registry (Avro) или встроенную JSON-схему. Consumer должен уметь десериализовать.
5. События: INSERT, UPDATE, DELETE и трансформация в чанки
Consumer получает сообщение вида:
{
"schema": { ... },
"payload": {
"op": "c", // "c" = create, "u" = update, "d" = delete
"before": { "id": 123, "title": "старый title", "content": "старый content" }, // null для create
"after": { "id": 123, "title": "новый title", "content": "новый content" }
}
}
Логика обработки для каждого типа события:
-
INSERT (
op= "c"):- Получаем
after. - Разбиваем
contentна чанки (по стратегии — фиксированная длина с перекрытием, semantic chunking, recursive splitter). - Для каждого чанка: генерируем эмбеддинг, добавляем метаданные (document_id, title,
chunk_index). - Записываем в векторную БД (batch insert).
- Получаем
-
UPDATE (
op= "u"):- Удаляем все существующие чанки, чьи document_id равен
after.id. - Разбиваем новый
contentна чанки, эмбеддинг, запись. - Вариант 2 (если поддерживается upsert): использовать операцию upsert по составному ключу (document_id + chunk_index). Так дешевле, но нужно знать, какие чанки были удалены.
- Удаляем все существующие чанки, чьи document_id равен
-
DELETE (
op= "d"):- По
before.idнаходим и удаляем все чанки из векторной БД. - Дополнительно можно удалить метаданные из кэша или реляционной БД.
- По
Чанкинг должен быть воспроизводим при одном и том же content алгоритм должен выдавать идентичные чанки. Иначе при переиндексации одного документа без изменений мы получим дубликаты.
6. Интеграция с векторной БД
Типовые операции:
- Pinecone:
index.upsert(vectors=[...])для create/update,index.delete(ids=[...])для delete. - Weaviate: batch import через
client.batch.configure(). - Qdrant:
upsertс помощьюcollection.upload_records(). - Chroma:
collection.add()иcollection.delete().
Транзакционность часто векторная БД не поддерживает атомарные операции по нескольким записям. Если consumer падает после частичного обновления, мы можем получить неконсистентное состояние. Решения:
- Использовать exactly-once семантику в consumer (через Kafka transactions + idempotent storage).
- Хранить в отдельной таблице статус последней успешной обработки события (
last_processed_offset). - Периодически делать полную выверку (reconciliation): сравнить все документы из source с векторной БД и исправить расхождения.
7. Проблемы и компромиссы при проектировании CDC
| Проблема | Описание | Решение |
|---|---|---|
| Ordering (упорядочивание) | События по одному документу должны обрабатываться в порядке их возникновения. Kafka гарантирует порядок внутри партиции по ключу. | Использовать ID документа как ключ сообщения. |
| Deduplication (дедупликация) | При сетевых сбоях или ребалансе consumer может обработать одно событие дважды. | Идемпотентные операции (upsert). Сделать consumer идемпотентным с помощью таблицы processed_events. |
| Latency (задержка) | Debezium добавляет задержку в миллисекунды, Kafka — наносекунды, consumer и эмбеддинг — основное узкое место. | Использовать batch processing для эмбеддинга, параллельные consumer-инстансы. |
| Snapshot vs streaming | При старте или перезапуске нужно сначала сделать snapshot существующих документов, затем переключиться на стриминг. | Debezium сам это делает (snapshot.mode). Consumer должен различать snapshot и streaming: snapshot не генерирует событий удаления, поэтому нужно очистить векторную БД перед snapshot. |
| Backpressure (обратное давление) | Если consumer не справляется, Kafka может накопить много сообщений. | Настроить retention в Kafka, мониторить lag (kafka-consumer-groups), масштабировать consumer. |
| Schema evolution (эволюция схемы) | Добавление полей в таблицу documents. | Использовать schema registry, backward compatible; consumer игнорирует новые поля, если не нужны. |
8. Альтернативные инструменты и сценарии без CDC
Когда CDC избыточен или невозможен
- Источник не поддерживает CDC (например, сторонний API).
- Объём изменений мал (несколько документов в день).
- Нет инфраструктуры (Kafka, Debezium).
Тогда можно использовать
- Polling (опрос по расписанию): каждые N минут проверять
updated_atи забирать документы, изменённые с последнего опроса. - Webhooks (входящие уведомления): если источник (SharePoint, Confluence) умеет отправлять HTTP-вызовы при изменении документа.
- Full resync (полная переиндексация): раз в сутки перестраивать весь индекс. Дешево в реализации, но дорого по ресурсам и latency.
Сравнение подходов
| Критерий | CDC (Debezium) | Polling | Full resync |
|---|---|---|---|
| Задержка (freshness) | Миллисекунды | Секунды–минуты | Часы |
| Нагрузка на источник | Низкая (логическая репликация) | Средняя (SELECT с условием) | Высокая (полный scan) |
| Сложность инфраструктуры | Высокая (Kafka, Connect, CDC engine) | Низкая | Низкая |
| Надёжность | Высокая (точно-один раз, порядок) | Средняя (пропуски возможны) | Средняя (дубли при прерывании) |
| Подходит для real-time | Да | Нет | Нет |
9. Связь с Agentic RAG: агент как потребитель изменений
В архитектуре Agentic RAG агент может выступать в роли продвинутого потребителя CDC-потока. Агент не просто обновляет векторную БД, а принимает решения:
- Приоритет изменений если документ содержит критические данные (например, обновление политики), агент может немедленно переиндексировать и уведомить других агентов.
- Проверка целостности агент может сравнить сгенерированный эмбеддинг для изменённого документа с предыдущим, и если изменение незначительно — не обновлять.
- Ручной override агент может временно приостановить CDC для определённых документов (например, при массовом обновлении).
Пример: AgenticRAG CDC Agent подписан на топик documents.cdc, получает событие UPDATE, отправляет запрос к LLM: «Значительно ли изменился контент? Если да — обновить индекс». Это экономит вычислительные ресурсы.
10. Мониторинг и надежность CDC-пайплайна
Метрики для отслеживания
- Kafka consumer lag — разница между последним сообщением в топике и последним обработанным.
- Event processing latency — время от фиксации в источнике до обновления векторной БД.
- Throughput — количество обработанных событий в секунду.
- Error rate — доля событий, которые не удалось применить (например, chunking не удался).
- Consistency check — доля документов, для которых совпадает количество чанков в source и vector DB.
Инструменты Prometheus + Grafana для сбора метрик, Alertmanager для уведомлений при росте lag.
Рекомендации
- Настроить мониторинг lag для каждой партиции.
- При превышении порога (например, >1000 событий) автоматически донастроить parallelism consumer.
- Использовать dead letter queue (DLQ) для событий, которые невозможно обработать (например, невалидный JSON в content).
Пет-проект для закрепления
Задача Реализовать минимальный CDC-пайплайн для документов на PostgreSQL с обновлением Chroma (in-memory vector DB).
Инструменты
- Docker Compose (PostgreSQL + Debezium + Kafka + Kafka Connect).
- Python (fastapi для REST, kafka-python для consumer, chromadb для векторного хранилища).
langchainдля chunking и эмбеддинга (можно заглушка).
Шаги:
-
Поднимите инфраструктуру
Создайтеdocker-compose.ymlс сервисами:postgres,zookeeper,kafka,debezium-connect. Используйте образdebezium/connect:2.5. -
Создайте таблицу документов
CREATE TABLE documents ( id SERIAL PRIMARY KEY, title TEXT NOT NULL, content TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -
Настройте Debezium connector через REST API:
Отправьте POST наhttp://localhost:8083/connectorsс JSON из раздела 4. -
Напишите Python consumer
- Подключитесь к Kafka (bootstrap_servers).
- Читайте топик
documents-server.public.documents. - Для каждого сообщения: распарсите
op, вызовите методapply_event(event).
-
Реализуйте логику обработки
- INSERT: разбейте
contentна чанки по 200 токенов (overlap 20), сгенерируйте эмбеддинги (можно использоватьsentence-transformers/all-MiniLM-L6-v2), добавьте в Chroma. - UPDATE: удалите все чанки с метаданными
{"document_id": <id>}, затем вставьте новые. - DELETE: удалите все чанки с соответствующим
document_id.
- INSERT: разбейте
-
Проверьте работу
- Вставьте строку в PostgreSQL:
INSERT INTO documents (title, content) VALUES ('Test', 'Hello world!'); - Проверьте, что в Chroma появился чанк.
- Обновите строку:
UPDATE documents SET content='Hello world updated!' WHERE id=1; - Убедитесь, что чанк обновлён.
- Удалите строку:
DELETE FROM documents WHERE id=1; - Проверьте, что чанк удалён.
- Вставьте строку в PostgreSQL:
-
Добавьте мониторинг
Выведите lag consumer’а и количество обработанных событий в stdout.
Ожидаемый результат
Работающий пайплайн, при изменениях в PostgreSQL векторная БД синхронизируется с задержкой менее 1 секунды.
Связь с другими вопросами
| Вопрос | Тема | Связь |
|---|---|---|
| 850 | Архитектура Agentic RAG | CDC — часть инфраструктуры для свежести данных в Agentic RAG |
| 852 | Индексация документов | CDC влияет на стратегию индексации (инкрементальная vs полная) |
| 854 | Управление версиями документов | CDC позволяет отслеживать версии и откаты |
| 856 | Обработка конфликтов в данных | При CDC нужно решать конфликты (параллельные обновления) |
| 857 | Мониторинг RAG-системы | Мониторинг CDC — часть общей observability |
| 860 | Пайплайны данных для AI | CDC — один из этапов data pipeline |
Навигация
- Предыдущий: 854
- Следующий: 856
- Индекс: 00. Индекс разборов