English translation is not available yet. Showing Russian content.

RAG с incremental update (CDC из PostgreSQL → Kafka → Qdrant)

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: RAG с incremental update (CDC из PostgreSQLKafkaQdrant)

1. Цель задачи

Спроектировать и реализовать пайплайн инкрементального обновления векторной базы данных (Qdrant) для RAG-системы на основе Change Data Capture (CDC) из PostgreSQL через Apache Kafka. Достичь времени обновления эмбеддингов для одного изменённого документа (вставка/обновление/удаление) менее 5 секунд от момента фиксации транзакции в PostgreSQL до появления актуального вектора в Qdrant.

Ключевой результат Рабочий прототип пайплайна CDCKafkaQdrant с подтверждённой задержкой <5 сек на единичное изменение.


2. Исходные данные

Что нужноОткуда взять
PostgreSQL с тестовой таблицей документовЛокальный Docker-контейнер (postgres:15)
Apache Kafka и Kafka ConnectDocker-образы confluentinc/cp-kafka, confluentinc/cp-kafka-connect
Debezium PostgreSQL ConnectorУстановить из Confluent Hub или использовать готовый образ debezium/connect
Qdrant (векторная БД)Docker-образ qdrant/qdrant
Embedding-модель (например, intfloat/e5-small-v2)Hugging Face, sentence-transformers
Python 3.10+ с библиотекамиrequirements.txt: kafka-python, qdrant-client, sentence-transformers, psycopg2-binary
Инструмент мониторинга задержкиСобственный скрипт на Python с замером времени через time.time() или метрики Prometheus

Если нет реального инструмента — симулируем:

  1. PostgreSQL Запустить локальный контейнер docker run --name pg -e POSTGRES_PASSWORD=pass -p 5432:5432 -d postgres:15. Создать таблицу documents(id SERIAL PRIMARY KEY, content TEXT, updated_at TIMESTAMP DEFAULT NOW()).
  2. Kafka Поднять Kafka и Zookeeper через docker-compose.yml (стандартная конфигурация Confluent).
  3. Kafka Connect Запустить confluentinc/cp-kafka-connect с установленным Debezium PostgreSQL Connector (плагин debezium-connector-postgres).
  4. Qdrant docker run -p 6333:6333 -d qdrant/qdrant.
  5. Отсутствие реального CDC Вместо Debezium можно написать триггер в PostgreSQL, который отправляет изменения в Kafka через pg_notify → слушатель на Python — но это не чистое CDC. В данном задании используем Debezium как стандартный промышленный инструмент.

3. Технологический стек

КомпонентИнструментыНазначение
База данных-источникPostgreSQL 15, таблица documents с колонками id, content, metadata, updated_atХранилище исходных документов с отслеживанием изменений
CDC (Change Data Capture)Debezium PostgreSQL Connector (v2.x) в Kafka ConnectЗахват изменений (INSERT, UPDATE, DELETE) из PostgreSQL
Потоковая платформаApache Kafka (Confluent Platform), топик dbserver1.public.documentsБуферизация и гарантированная доставка событий изменений
Обработчик событийPython-сервис (kafka-python + qdrant-client + sentence-transformers)Чтение событий из Kafka, генерация эмбеддингов, запись в Qdrant
Embedding-модельintfloat/e5-small-v2 (384-dim) через sentence-transformersПреобразование текста документа в вектор
Векторная БДQdrant (single node), коллекция documents с HNSW-индексомХранение и поиск документов по эмбеддингам
КонтейнеризацияDocker ComposeОркестрация всех сервисов
Мониторинг задержкиPython-скрипт с замером времени от получения события до записи в QdrantВерификация SLA <5 сек

4. Этапы выполнения

Этап 1: Развёртывание инфраструктуры (2 часа)

Действия

  1. Написать docker-compose.yml со следующими сервисами:

    • postgres (образ postgres:15) – с включённым wal_level = logical и max_replication_slots = 1 (конфигурация через command).
    • zookeeper (confluentinc/cp-zookeeper).
    • kafka (confluentinc/cp-kafka) – настроить KAFKA_ADVERTISED_LISTENERS.
    • kafka-connect (debezium/connect:2.5) – монтировать volume для плагинов.
    • qdrant (qdrant/qdrant).
    • processor (build из ./processor) – Python-сервис.

    Пример фрагмента docker-compose.yml:

    version: '3.8'
    services:
      postgres:
        image: postgres:15
        environment:
          POSTGRES_PASSWORD: pass
          POSTGRES_DB: ragdb
        command: ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=1"]
        ports: ["5432:5432"]
      # ... остальные сервисы
    
  2. Создать таблицу documents в PostgreSQL через psql:

    CREATE TABLE documents (
        id SERIAL PRIMARY KEY,
        content TEXT NOT NULL,
        updated_at TIMESTAMP DEFAULT NOW()
    );
    
  3. Настроить Debezium Connector через REST API Kafka Connect:

    • Создать конфигурацию connector (POST на http://localhost:8083/connectors):
    {
      "name": "documents-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "pass",
        "database.dbname": "ragdb",
        "database.server.name": "dbserver1",
        "table.include.list": "public.documents",
        "plugin.name": "pgoutput",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
      }
    }
    
  4. Проверить, что события появляются в Kafka: подписаться на топик dbserver1.public.documents через kafka-console-consumer.

Ожидаемый результат этапа Все контейнеры запущены, Kafka Connect успешно подключён к PostgreSQL, события изменений видны в Kafka.


Этап 2: Разработка Python-процессора (3 часа)

Действия

  1. Создать структуру проекта

    processor/
    ├── main.py
    ├── embedding.py
    ├── qdrant_writer.py
    ├── config.py
    ├── requirements.txt
    └── Dockerfile
    
  2. Реализовать модуль эмбеддингов (embedding.py):

    from sentence_transformers import SentenceTransformer
    
    model = SentenceTransformer('intfloat/e5-small-v2')
    
    def get_embedding(text: str) -> list[float]:
        # Добавляем префикс для e5: "passage: "
        return model.encode("passage: " + text).tolist()
    
  3. Реализовать модуль записи в Qdrant (qdrant_writer.py):

    from qdrant_client import QdrantClient
    from qdrant_client.models import PointStruct, VectorParams, Distance
    
    client = QdrantClient(host="qdrant", port=6333)
    
    def ensure_collection():
        collections = client.get_collections().collections
        if not any(c.name == "documents" for c in collections):
            client.create_collection(
                collection_name="documents",
                vectors_config=VectorParams(size=384, distance=Distance.COSINE)
            )
    
  4. Реализовать основной цикл (main.py):

    import json
    from kafka import KafkaConsumer
    import time
    from embedding import get_embedding
    from qdrant_writer import client, ensure_collection
    
    ensure_collection()
    
    consumer = KafkaConsumer(
        'dbserver1.public.documents',
        bootstrap_servers=['kafka:9092'],
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='latest',
        group_id='rag-processor'
    )
    
    LATENCY_METRICS = []
    
    for msg in consumer:
        event = msg.value
        # Debezium unwrap: event содержит поля таблицы
        doc_id = event['id']
        content = event.get('content', '')
        op = event.get('__op')  # 'r' (read), 'c' (create), 'u' (update), 'd' (delete)
    
        start_time = time.time()
    
        if op == 'd':
            client.delete_points(
                collection_name="documents",
                points=[doc_id]
            )
        else:
            vector = get_embedding(content)
            client.upsert(
                collection_name="documents",
                points=[PointStruct(id=doc_id, vector=vector, payload={"content": content})]
            )
    
        latency = time.time() - start_time
        LATENCY_METRICS.append(latency)
        print(f"Latency: {latency*1000:.1f}ms")
    
  5. Добавить Dockerfile

    FROM python:3.11-slim
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    COPY . .
    CMD ["python", "main.py"]
    
  6. Собрать образ docker compose build processor.

Ожидаемый результат этапа Процессор запущен, читает события из Kafka, генерирует эмбеддинги и записывает/удаляет точки в Qdrant. В stdout выводятся замеры латентности.


Этап 3: Интеграционное тестирование и измерение задержки (1 час)

Действия

  1. Заполнить PostgreSQL тестовыми данными

    INSERT INTO documents (content) VALUES ('Первый документ о машинном обучении');
    INSERT INTO documents (content) VALUES ('Второй документ про обработку естественного языка');
    
  2. Проверить, что точки появились в Qdrant:

    curl http://localhost:6333/collections/documents/points/scroll
    
  3. Выполнить UPDATE и DELETE

    UPDATE documents SET content = 'Обновлённый документ', updated_at = NOW() WHERE id = 1;
    DELETE FROM documents WHERE id = 2;
    
  4. Собрать статистику латентности процессор печатает задержки. Запустить 50 изменений, вычислить среднее, 95-й и 99-й перцентили.

  5. Дополнительно Написать скрипт measure.py, который вставляет запись в PostgreSQL, засекает время и проверяет появление вектора в Qdrant с помощью цикла ожидания (polling с шагом 100 мс). Цель — зафиксировать полный end-to-end latency.

Ожидаемый результат этапа Подтверждено, что средняя задержка <5 сек для единичного изменения. Для массового бэтча (>100 записей) время может расти, но это выходит за рамки SLA.


Этап 4: Обработка краевых случаев и устойчивость (1.5 часа)

Действия

  1. Обработка ошибок в процессоре

    • Добавить retry при сбое Qdrant (до 3 попыток с exponential backoff).
    • Логировать failed-записи в отдельный файл и в Kafka в DLQ (dead letter queue) топик.
    • При старте процессора проверять наличие коллекции, если нет — создавать.
  2. Обработка перезапуска Kafka Connect

    • Kafka Connect гарантирует at-least-once delivery. Процессор должен быть идемпотентным: upsert по id в Qdrant безопасен при повторах.
    • Проверить сценарий: остановить процессор, выполнить 3 изменения в PostgreSQL, потом перезапустить процессор — все изменения должны быть обработаны.
  3. Мониторинг с Prometheus

    • Установить библиотеку prometheus_client в процессор.
    • Экспортировать гистограмму латентности rag_update_latency_seconds и счётчики rag_updates_total с лейблами operation=create|update|delete.
    • Добавить endpoint /metrics на порту 8000.
  4. Добавить health check в processor (HTTP GET /health).

Ожидаемый результат этапа Процессор устойчив к сбоям, имеет метрики и health check.


Этап 5: Документация и демонстрация (30 минут)

Действия

  1. Написать README.md с описанием архитектуры (диаграмма: PostgreSQLDebeziumKafka → Processor → Qdrant).
  2. Указать команды для запуска: docker compose up -d.
  3. Зафиксировать результаты тестов: средняя задержка X мс, p95 Y мс.
  4. Сделать скриншоты или вывод curl запросов к Qdrant, демонстрирующие актуальность данных после изменений.

Ожидаемый результат этапа Документированный проект в репозитории с подтверждённым SLA.


5. Критерии приемки (Definition of Done)

  • Все сервисы (PostgreSQL, Kafka, Kafka Connect, Qdrant, Processor) запускаются одной командой docker compose up.
  • При вставке новой записи в PostgreSQL вектор появляется в Qdrant не позднее чем через 5 секунд (замер через polling).
  • При обновлении существующей записи старый вектор заменяется на новый (по тому же id).
  • При удалении записи из PostgreSQL соответствующий вектор удаляется из Qdrant.
  • Латентность обработки одного события (от момента чтения из Kafka до записи в Qdrant) не превышает 500 мс (внутренняя метрика).
  • End-to-end задержка (от коммита в PostgreSQL до готовности в Qdrant) <5 секунд для единичного изменения.
  • Процессор корректно обрабатывает «горячий перезапуск» (восстанавливает обработку с прерванного места).
  • Метрики Prometheus экспортируются, доступен /health.
  • Проект содержит README с инструкцией по запуску и результатами тестов.

6. Ожидаемый результат

Главный артефакт Репозиторий (GitHub/GitLab) со следующей структурой:

rag-cdc/
├── docker-compose.yml
├── processor/
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── config.py
│   ├── main.py
│   ├── embedding.py
│   └── qdrant_writer.py
├── measure.py (скрипт для end-to-end замера)
├── README.md
└── screenshots/ (опционально)

Содержание README

  • Архитектурная диаграмма.
  • Инструкция по запуску.
  • Результаты производительности (средняя, p95, p99 задержка).
  • Инструкция по проверке (curl-команды).

Дополнительные артефакты

  • Docker Compose для инфраструктуры.
  • Метрики Prometheus (можно приложить скриншот Grafana, если есть).

7. Возможные сложности и их решение

СложностьРешение
Debezium не видит изменения (пустые события)Проверить wal_level = logical в PostgreSQL. Перезапустить connector. Проверить топик в Kafka.
Задержка >5 сек из-за медленного эмбеддингаИспользовать GPU-версию sentence-transformers или оптимизировать модель (ONNX). Уменьшить размер модели.
Потеря событий при перезапуске Kafka ConnectУбедиться, что offset.storage настроен на Kafka. Включить auto.offset.reset = earliest.
Процессор падает при ошибке QdrantДобавить retry (3 попытки), логирование, запись в DLQ-топик.
Множественные одинаковые события (дубликаты)Идемпотентность upsert в Qdrant — повторная запись с тем же id перезаписывает данные.
Репликация Kafka Connect не успевает за транзакциямиУвеличить parallelism процессора (несколько partition топика, несколько consumer).

8. Бюджет времени (оценка)

ЭтапВремя (часы)
Этап 1: Развёртывание инфраструктуры2
Этап 2: Разработка процессора3
Этап 3: Интеграционное тестирование1
Этап 4: Устойчивость и мониторинг1.5
Этап 5: Документация0.5
Итого8 часов

Примечание: Для первого раза рекомендуется заложить 10–12 часов с учётом отладки Debezium и настройки Docker Compose.


9. Связанные вопросы из базы знаний

ВопросТема
45CDC (Change Data Capture)
46Debezium + Kafka
47Kafka Connect и трансформы
78Qdrant – основы, collections, points
79Embedding-модели и sentence-transformers
82Инкрементальные обновления в RAG
134Docker Compose для микросервисов
187Мониторинг с Prometheus
211Pipeline latency измерение
238CDC из PostgreSQL в векторную БД

10. Чек-лист самопроверки

  • Я развернул все сервисы через docker compose up без ручных настроек.
  • Я проверил, что при INSERT в PostgreSQL через 1 сек вектор появляется в Qdrant.
  • Я проверил, что при UPDATE в PostgreSQL старый вектор заменён новым (поиск по id).
  • Я проверил, что при DELETE вектор исчезает из Qdrant (scroll по id).
  • Я замерил end-to-end задержку и убедился, что она <5 сек.
  • Я протестировал перезапуск процессора и остановку Kafka — система восстановилась без потерь.
  • Я добавил метрики и health check, проверил /metrics локально.
  • Я написал README с инструкцией и результатами.