English translation is not available yet. Showing Russian content.

Как вы проектируете Kafka топологии для RAG ingestion?

Краткий тезис

Проектирование Kafka‑топологии для RAG ingestion — это построение конвейера из нескольких этапов: приём сырых документов (documents.raw), парсинг и чанкинг (documents.chunks), генерация эмбеддингов и запись в векторную базу данных. Ключевые решения: выбор партиционирования (по source_id / document_id), настройка продюсеров и консьюмеров для обеспечения низкой задержки (<5 с для real‑time), обработка ошибок через DLQ (Dead Letter Queue) и обеспечение идемпотентности для устранения дубликатов.


1. Зачем Kafka в RAG ingestion?

Kafka — распределённая платформа потоковой передачи событий. В RAG‑системе документы постоянно обновляются (добавляются, изменяются, удаляются). Kafka позволяет:

  • Разделить этапы ingestion (приём, парсинг, чанкинг, эмбеддинги) на независимые шаги.
  • Обеспечить асинхронность и буферизацию при пиковых нагрузках.
  • Гарантировать отказоустойчивость и воспроизводимость (replay) сообщений.
  • Легко масштабировать каждый этап добавлением consumer’ов.

Термин Ingestion — процесс загрузки, обработки и индексации документов в RAG‑систему, чтобы они стали доступны для retrieval.


2. Общая архитектура топологии

Типичный конвейер состоит из трёх основных топиков и нескольких consumer‑групп:

ТопикНазначениеПартиционированиеТипичный retention
documents.rawСырые документы из источника (файлы, API, веб‑скрапинг)source_id (идентификатор источника)7–30 дней
documents.chunksРезультаты парсинга и нарезки на чанкиdocument_id7 дней
documents.errors / DLQСообщения, которые не удалось обработатьне критично90 дней или бесконечно

Поток данных:

Source → [documents.raw] → Consumer (parser + chunker) → [documents.chunks] → Consumer (embedder) → Vector DB

Каждый топик — commit log (журнал подтверждений), который позволяет нескольким consumer’ам читать одни и те же данные независимо.


3. Топик documents.raw

Структура сообщения

{
  "source_id": "web_crawler_1",
  "document_id": "uuid_v4",
  "format": "pdf",
  "content": "<base64 или ссылка на объектное хранилище>",
  "metadata": {"url": "...", "timestamp": 1234567890}
}

Партиционирование по source_id важно для порядковой обработки (order preservation): все документы от одного источника попадают в одну партицию, что упрощает обнаружение дубликатов и обеспечивает FIFO внутри источника.

Параметры продюсера

  • acks=all — подтверждение от всех реплик для избежания потери данных.
  • enable.idempotence=true — исключает дубликаты при повторных отправках.
  • batch.size и linger.ms — настраиваются под ожидаемый throughput (например, 32 KB и 5 ms для low‑latency).

4. Consumer stage 1: Парсинг и чанкинг

Consumer подписан на documents.raw. Основные задачи:

  1. Загрузка документа — если content — ссылка на S3, скачать.
  2. Парсинг — извлечение текста, таблиц, изображений (PDF, HTML, Docx).
  3. Чанкинг — разбиение на фрагменты (например, 512 токенов с перекрытием 10%).
  4. Обогащение метаданными — chunk_index, chunk_text, source_document_id.

Каждый чанк отправляется в documents.chunks как отдельное сообщение.

Партиционирование consumer’а

  • Consumer не должен читать из documents.raw с произвольным распределением — используем assign с указанием конкретных партиций (по source_id) или group.id и subscribe с partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.
  • Для сохранения порядка внутри одного документа все чанки одного документа должны попасть в одну партицию documents.chunks. Решение — ключ сообщения document_id при записи.

Обработка ошибок

  • Если документ не парсится (битый PDF) → отправить в DLQ с причиной ошибки.
  • Если чанк не получилось создать (превышен лимит размера) → отправить в DLQ или проигнорировать с логом.

5. Топик documents.chunks

Структура сообщения

{
  "chunk_id": "uuid_v4",
  "document_id": "uuid_v4",
  "source_id": "web_crawler_1",
  "chunk_index": 3,
  "chunk_text": "...",
  "metadata": {...}
}

Партиционирование по document_id гарантирует, что все чанки одного документа обрабатываются одним consumer’ом этапа эмбеддингов — это важно для атомарного обновления векторного индекса (удалить старые чанки, вставить новые для того же документа).

Параметры топика

  • partitions — число, кратное количеству consumer’ов второго этапа (например, 6 партиций для 3 consumer’ов).
  • replication.factor — минимум 3 для отказоустойчивости.
  • min.insync.replicas = 2 для обеспечения доступности при отказе одной реплики.

6. Consumer stage 2: Генерация эмбеддингов и запись в векторную БД

Consumer читает documents.chunks и выполняет:

  1. Batch‑запрос к embedding‑модели (например, OpenAI Embeddings или sentence‑transformers).
  2. Запись в векторную базу (Pinecone, Weaviate, Qdrant, FAISS).
  3. Отправка подтверждения (commit offset) только после успешной записи — at‑least‑once или exactly‑once с идемпотентностью на стороне БД (idempotent upsert).

Блокировка повторной обработки

  • Чанки уже обработанного документа могут быть переотправлены при ребалансировке. Используем upsert по chunk_id в векторной БД (замена, если существует).
  • Для кейса обновления документа (документ изменён) — сначала удаляем все чанки с document_id, затем вставляем новые.

7. Dead Letter Queue (DLQ)

DLQ — отдельный топик (documents.errors) для сообщений, которые consumer не смог обработать после нескольких retry.

Зачем

  • Не допустить blocking (consumer застревает на одном плохом сообщении).
  • Сохранить контекст для ручного или автоматического исправления.

Типичные причины попадания в DLQ

  • Ошибка парсинга (битый документ).
  • Ошибка эмбеддинг‑модели (лимит токенов превышен).
  • Временная недоступность векторной БД (после исчерпания retry).
  • Неверный формат данных.

Архитектура DLQ

  • В consumer’е конфигурируем: max.poll.records, max.poll.interval.ms, а также количество повторов (например, 3) перед отправкой в DLQ.
  • Для DLQ можно настроить отдельный consumer для мониторинга и автоматической переотправки после исправления ошибки (например, если проблема была временной).

8. Настройки производительности

Throughput vs Latency

ПараметрДля высокой пропускной способностиДля низкой задержки (<5 с)
batch.size1 MB16 KB
linger.ms100–500 ms0–5 ms
compression.typesnappy / zstdnone
acksallall (потери недопустимы)
max.in.flight.requests.per.connection51 (для порядка)

Партиции

  • Число партиций должно быть ≥ числа consumer’ов в группе.
  • Для real‑time ingestion важно, чтобы максимальный lag между записью в documents.raw и обновлением векторной БД был <5 секунд. Достигается малыми батчами и быстрыми consumer’ами.

Мониторинг

  • Lag (отставание консьюмера) — метрика из kafka-consumer-groups.sh.
  • Error rate — процент сообщений, ушедших в DLQ.
  • Throughput — сообщений/с по каждому топику.

9. Обработка ошибок и идемпотентность

Exactly‑once semantics (EOS)

Включаем enable.idempotence=true у продюсера и isolation.level=read_committed у consumer. Но EOS часто избыточна для RAG ingestion — достаточно at‑least‑once с идемпотентным upsert в векторной БД.

Retry и backoff

Используем exponential backoff: retry.backoff.ms (100 ms) и reconnect.backoff.ms. Consumer может приостановить потребление из партиции (pause()) при временной ошибке векторной БД.


10. Real‑time ingestion: специфика

Для сценариев, когда документы поступают с высокой частотой (например, логи, новостные ленты), требуется:

  • Streaming processing — Kafka Consumer API или Kafka Streams для лёгкой обработки.
  • Минимальный batch — consumer может быть настроен на fetch.min.bytes=1 и fetch.max.wait.ms=10 (но это увеличит число запросов).
  • Async producer — отправлять каждое сообщение с callback (чтобы не блокировать парсинг).

Альтернатива: использовать Kafka Connect для прямой записи из внешних источников (JDBC, S3).


11. Безопасность и управление схемами

  • Schema RegistryAvro, Protobuf или JSON Schema) — гарантирует совместимость форматов при обновлении кода.
  • ACL (Access Control Lists) — ограничить доступ к топикам для разных команд.
  • TLS / SASL — шифрование и аутентификация.

Пет-проект для закрепления

Задача: Реализовать простую Kafka‑топологию для RAG ingestion на локальной машине с помощью Docker.

Инструменты: Docker Compose (Kafka + Zookeeper), Python (kafka‑python, LangChain, sentence‑transformers), Qdrant (векторная БД).

Шаги:

  1. Поднять Kafka (один брокер, топики documents.raw (3 партиции) и documents.chunks (3 партиции), DLQ documents.errors).
  2. Написать продюсер, который читает PDF из папки и отправляет в documents.raw с ключом = source_id.
  3. Написать consumer stage 1: парсит PDF (PyMuPDF), разбивает на чанки (RecursiveCharacterTextSplitter), публикует чанки в documents.chunks с ключом = document_id.
  4. Написать consumer stage 2: получает чанки, эмбеддит (all‑MiniLM‑L6‑v2), делает upsert в Qdrant.
  5. Добавить обработку ошибок: при ошибке парсинга — запись в DLQ.
  6. Замерить latency (время от отправки raw‑документа до появления его чанков в Qdrant).

Ожидаемый результат: Работающий конвейер, который загружает PDF, превращает их в чанки и индексирует в векторной БД с задержкой <10 секунд для документов до 100 страниц.


Связь с другими вопросами

ВопросТема
Вопрос 7Уменьшение latency RAG-системы
Вопрос 1Проектирование RAG для 10 000 документов
Вопрос 3Стратегии chunking’а
Вопрос 12Self-RAG и обновление индекса
Вопрос 9Обновление документов в существующей RAG
Вопрос 15Мониторинг и логирование RAG

Навигация