RAG с incremental update (CDC из PostgreSQL → Kafka → Qdrant)
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: RAG с incremental update (CDC из PostgreSQL → Kafka → Qdrant)
1. Цель задачи
Спроектировать и реализовать пайплайн инкрементального обновления векторной базы данных (Qdrant) для RAG-системы на основе Change Data Capture (CDC) из PostgreSQL через Apache Kafka. Достичь времени обновления эмбеддингов для одного изменённого документа (вставка/обновление/удаление) менее 5 секунд от момента фиксации транзакции в PostgreSQL до появления актуального вектора в Qdrant.
Ключевой результат Рабочий прототип пайплайна CDC → Kafka → Qdrant с подтверждённой задержкой <5 сек на единичное изменение.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| PostgreSQL с тестовой таблицей документов | Локальный Docker-контейнер (postgres:15) |
| Apache Kafka и Kafka Connect | Docker-образы confluentinc/cp-kafka, confluentinc/cp-kafka-connect |
| Debezium PostgreSQL Connector | Установить из Confluent Hub или использовать готовый образ debezium/connect |
| Qdrant (векторная БД) | Docker-образ qdrant/qdrant |
Embedding-модель (например, intfloat/e5-small-v2) | Hugging Face, sentence-transformers |
| Python 3.10+ с библиотеками | requirements.txt: kafka-python, qdrant-client, sentence-transformers, psycopg2-binary |
| Инструмент мониторинга задержки | Собственный скрипт на Python с замером времени через time.time() или метрики Prometheus |
Если нет реального инструмента — симулируем:
- PostgreSQL Запустить локальный контейнер docker run --name pg -e POSTGRES_PASSWORD=pass -p 5432:5432 -d postgres:15. Создать таблицу documents(id SERIAL PRIMARY KEY, content TEXT, updated_at TIMESTAMP DEFAULT NOW()).
- Kafka Поднять Kafka и Zookeeper через docker-compose.yml (стандартная конфигурация Confluent).
- Kafka Connect Запустить
confluentinc/cp-kafka-connectс установленным Debezium PostgreSQL Connector (плагинdebezium-connector-postgres). - Qdrant docker run -p 6333:6333 -d qdrant/qdrant.
- Отсутствие реального CDC Вместо Debezium можно написать триггер в PostgreSQL, который отправляет изменения в Kafka через pg_notify → слушатель на Python — но это не чистое CDC. В данном задании используем Debezium как стандартный промышленный инструмент.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| База данных-источник | PostgreSQL 15, таблица documents с колонками id, content, metadata, updated_at | Хранилище исходных документов с отслеживанием изменений |
| CDC (Change Data Capture) | Debezium PostgreSQL Connector (v2.x) в Kafka Connect | Захват изменений (INSERT, UPDATE, DELETE) из PostgreSQL |
| Потоковая платформа | Apache Kafka (Confluent Platform), топик dbserver1.public.documents | Буферизация и гарантированная доставка событий изменений |
| Обработчик событий | Python-сервис (kafka-python + qdrant-client + sentence-transformers) | Чтение событий из Kafka, генерация эмбеддингов, запись в Qdrant |
| Embedding-модель | intfloat/e5-small-v2 (384-dim) через sentence-transformers | Преобразование текста документа в вектор |
| Векторная БД | Qdrant (single node), коллекция documents с HNSW-индексом | Хранение и поиск документов по эмбеддингам |
| Контейнеризация | Docker Compose | Оркестрация всех сервисов |
| Мониторинг задержки | Python-скрипт с замером времени от получения события до записи в Qdrant | Верификация SLA <5 сек |
4. Этапы выполнения
Этап 1: Развёртывание инфраструктуры (2 часа)
Действия
-
Написать docker-compose.yml со следующими сервисами:
- postgres (образ postgres:15) – с включённым
wal_level = logicalиmax_replication_slots = 1(конфигурация через command). - zookeeper (confluentinc/cp-zookeeper).
- kafka (confluentinc/cp-kafka) – настроить
KAFKA_ADVERTISED_LISTENERS. kafka-connect(debezium/connect:2.5) – монтировать volume для плагинов.- qdrant (qdrant/qdrant).
processor(build из./processor) – Python-сервис.
Пример фрагмента docker-compose.yml:
version: '3.8' services: postgres: image: postgres:15 environment: POSTGRES_PASSWORD: pass POSTGRES_DB: ragdb command: ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=1"] ports: ["5432:5432"] # ... остальные сервисы - postgres (образ postgres:15) – с включённым
-
Создать таблицу documents в PostgreSQL через psql:
CREATE TABLE documents ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, updated_at TIMESTAMP DEFAULT NOW() ); -
Настроить Debezium Connector через REST API Kafka Connect:
- Создать конфигурацию connector (POST на http://localhost:8083/connectors):
{ "name": "documents-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "pass", "database.dbname": "ragdb", "database.server.name": "dbserver1", "table.include.list": "public.documents", "plugin.name": "pgoutput", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } -
Проверить, что события появляются в Kafka: подписаться на топик dbserver1.public.documents через
kafka-console-consumer.
Ожидаемый результат этапа Все контейнеры запущены, Kafka Connect успешно подключён к PostgreSQL, события изменений видны в Kafka.
Этап 2: Разработка Python-процессора (3 часа)
Действия
-
Создать структуру проекта
processor/ ├── main.py ├── embedding.py ├── qdrant_writer.py ├── config.py ├── requirements.txt └── Dockerfile -
Реализовать модуль эмбеддингов (
embedding.py):from sentence_transformers import SentenceTransformer model = SentenceTransformer('intfloat/e5-small-v2') def get_embedding(text: str) -> list[float]: # Добавляем префикс для e5: "passage: " return model.encode("passage: " + text).tolist() -
Реализовать модуль записи в Qdrant (
qdrant_writer.py):from qdrant_client import QdrantClient from qdrant_client.models import PointStruct, VectorParams, Distance client = QdrantClient(host="qdrant", port=6333) def ensure_collection(): collections = client.get_collections().collections if not any(c.name == "documents" for c in collections): client.create_collection( collection_name="documents", vectors_config=VectorParams(size=384, distance=Distance.COSINE) ) -
Реализовать основной цикл (
main.py):import json from kafka import KafkaConsumer import time from embedding import get_embedding from qdrant_writer import client, ensure_collection ensure_collection() consumer = KafkaConsumer( 'dbserver1.public.documents', bootstrap_servers=['kafka:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='rag-processor' ) LATENCY_METRICS = [] for msg in consumer: event = msg.value # Debezium unwrap: event содержит поля таблицы doc_id = event['id'] content = event.get('content', '') op = event.get('__op') # 'r' (read), 'c' (create), 'u' (update), 'd' (delete) start_time = time.time() if op == 'd': client.delete_points( collection_name="documents", points=[doc_id] ) else: vector = get_embedding(content) client.upsert( collection_name="documents", points=[PointStruct(id=doc_id, vector=vector, payload={"content": content})] ) latency = time.time() - start_time LATENCY_METRICS.append(latency) print(f"Latency: {latency*1000:.1f}ms") -
Добавить Dockerfile
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "main.py"] -
Собрать образ docker compose build processor.
Ожидаемый результат этапа Процессор запущен, читает события из Kafka, генерирует эмбеддинги и записывает/удаляет точки в Qdrant. В stdout выводятся замеры латентности.
Этап 3: Интеграционное тестирование и измерение задержки (1 час)
Действия
-
Заполнить PostgreSQL тестовыми данными
INSERT INTO documents (content) VALUES ('Первый документ о машинном обучении'); INSERT INTO documents (content) VALUES ('Второй документ про обработку естественного языка'); -
Проверить, что точки появились в Qdrant:
curl http://localhost:6333/collections/documents/points/scroll -
Выполнить UPDATE и DELETE
UPDATE documents SET content = 'Обновлённый документ', updated_at = NOW() WHERE id = 1; DELETE FROM documents WHERE id = 2; -
Собрать статистику латентности процессор печатает задержки. Запустить 50 изменений, вычислить среднее, 95-й и 99-й перцентили.
-
Дополнительно Написать скрипт
measure.py, который вставляет запись в PostgreSQL, засекает время и проверяет появление вектора в Qdrant с помощью цикла ожидания (polling с шагом 100 мс). Цель — зафиксировать полный end-to-end latency.
Ожидаемый результат этапа Подтверждено, что средняя задержка <5 сек для единичного изменения. Для массового бэтча (>100 записей) время может расти, но это выходит за рамки SLA.
Этап 4: Обработка краевых случаев и устойчивость (1.5 часа)
Действия
-
Обработка ошибок в процессоре
-
Обработка перезапуска Kafka Connect
- Kafka Connect гарантирует at-least-once delivery. Процессор должен быть идемпотентным: upsert по
idв Qdrant безопасен при повторах. - Проверить сценарий: остановить процессор, выполнить 3 изменения в PostgreSQL, потом перезапустить процессор — все изменения должны быть обработаны.
- Kafka Connect гарантирует at-least-once delivery. Процессор должен быть идемпотентным: upsert по
-
Мониторинг с Prometheus
- Установить библиотеку
prometheus_clientв процессор. - Экспортировать гистограмму латентности
rag_update_latency_secondsи счётчикиrag_updates_totalс лейбламиoperation=create|update|delete. - Добавить endpoint
/metricsна порту 8000.
- Установить библиотеку
Ожидаемый результат этапа Процессор устойчив к сбоям, имеет метрики и health check.
Этап 5: Документация и демонстрация (30 минут)
Действия
- Написать
README.mdс описанием архитектуры (диаграмма: PostgreSQL → Debezium → Kafka → Processor → Qdrant). - Указать команды для запуска:
docker compose up -d. - Зафиксировать результаты тестов: средняя задержка X мс, p95 Y мс.
- Сделать скриншоты или вывод
curlзапросов к Qdrant, демонстрирующие актуальность данных после изменений.
Ожидаемый результат этапа Документированный проект в репозитории с подтверждённым SLA.
5. Критерии приемки (Definition of Done)
- Все сервисы (PostgreSQL, Kafka, Kafka Connect, Qdrant, Processor) запускаются одной командой
docker compose up. - При вставке новой записи в PostgreSQL вектор появляется в Qdrant не позднее чем через 5 секунд (замер через polling).
- При обновлении существующей записи старый вектор заменяется на новый (по тому же
id). - При удалении записи из PostgreSQL соответствующий вектор удаляется из Qdrant.
- Латентность обработки одного события (от момента чтения из Kafka до записи в Qdrant) не превышает 500 мс (внутренняя метрика).
- End-to-end задержка (от коммита в PostgreSQL до готовности в Qdrant) <5 секунд для единичного изменения.
- Процессор корректно обрабатывает «горячий перезапуск» (восстанавливает обработку с прерванного места).
- Метрики Prometheus экспортируются, доступен
/health. - Проект содержит README с инструкцией по запуску и результатами тестов.
6. Ожидаемый результат
Главный артефакт Репозиторий (GitHub/GitLab) со следующей структурой:
rag-cdc/
├── docker-compose.yml
├── processor/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── config.py
│ ├── main.py
│ ├── embedding.py
│ └── qdrant_writer.py
├── measure.py (скрипт для end-to-end замера)
├── README.md
└── screenshots/ (опционально)
Содержание README
- Архитектурная диаграмма.
- Инструкция по запуску.
- Результаты производительности (средняя, p95, p99 задержка).
- Инструкция по проверке (curl-команды).
Дополнительные артефакты
- Docker Compose для инфраструктуры.
- Метрики Prometheus (можно приложить скриншот Grafana, если есть).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Debezium не видит изменения (пустые события) | Проверить wal_level = logical в PostgreSQL. Перезапустить connector. Проверить топик в Kafka. |
| Задержка >5 сек из-за медленного эмбеддинга | Использовать GPU-версию sentence-transformers или оптимизировать модель (ONNX). Уменьшить размер модели. |
| Потеря событий при перезапуске Kafka Connect | Убедиться, что offset.storage настроен на Kafka. Включить auto.offset.reset = earliest. |
| Процессор падает при ошибке Qdrant | Добавить retry (3 попытки), логирование, запись в DLQ-топик. |
| Множественные одинаковые события (дубликаты) | Идемпотентность upsert в Qdrant — повторная запись с тем же id перезаписывает данные. |
| Репликация Kafka Connect не успевает за транзакциями | Увеличить parallelism процессора (несколько partition топика, несколько consumer). |
8. Бюджет времени (оценка)
| Этап | Время (часы) |
|---|---|
| Этап 1: Развёртывание инфраструктуры | 2 |
| Этап 2: Разработка процессора | 3 |
| Этап 3: Интеграционное тестирование | 1 |
| Этап 4: Устойчивость и мониторинг | 1.5 |
| Этап 5: Документация | 0.5 |
| Итого | 8 часов |
Примечание: Для первого раза рекомендуется заложить 10–12 часов с учётом отладки Debezium и настройки Docker Compose.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 45 | CDC (Change Data Capture) |
| 46 | Debezium + Kafka |
| 47 | Kafka Connect и трансформы |
| 78 | Qdrant – основы, collections, points |
| 79 | Embedding-модели и sentence-transformers |
| 82 | Инкрементальные обновления в RAG |
| 134 | Docker Compose для микросервисов |
| 187 | Мониторинг с Prometheus |
| 211 | Pipeline latency измерение |
| 238 | CDC из PostgreSQL в векторную БД |
10. Чек-лист самопроверки
- Я развернул все сервисы через
docker compose upбез ручных настроек. - Я проверил, что при INSERT в PostgreSQL через 1 сек вектор появляется в Qdrant.
- Я проверил, что при UPDATE в PostgreSQL старый вектор заменён новым (поиск по id).
- Я проверил, что при DELETE вектор исчезает из Qdrant (scroll по id).
- Я замерил end-to-end задержку и убедился, что она <5 сек.
- Я протестировал перезапуск процессора и остановку Kafka — система восстановилась без потерь.
- Я добавил метрики и health check, проверил
/metricsлокально. - Я написал README с инструкцией и результатами.