Как вы проектируете Kafka топологии для RAG ingestion?
Краткий тезис
Проектирование Kafka‑топологии для RAG ingestion — это построение конвейера из нескольких этапов: приём сырых документов (documents.raw), парсинг и чанкинг (documents.chunks), генерация эмбеддингов и запись в векторную базу данных. Ключевые решения: выбор партиционирования (по source_id / document_id), настройка продюсеров и консьюмеров для обеспечения низкой задержки (<5 с для real‑time), обработка ошибок через DLQ (Dead Letter Queue) и обеспечение идемпотентности для устранения дубликатов.
1. Зачем Kafka в RAG ingestion?
Kafka — распределённая платформа потоковой передачи событий. В RAG‑системе документы постоянно обновляются (добавляются, изменяются, удаляются). Kafka позволяет:
- Разделить этапы ingestion (приём, парсинг, чанкинг, эмбеддинги) на независимые шаги.
- Обеспечить асинхронность и буферизацию при пиковых нагрузках.
- Гарантировать отказоустойчивость и воспроизводимость (replay) сообщений.
- Легко масштабировать каждый этап добавлением consumer’ов.
Термин Ingestion — процесс загрузки, обработки и индексации документов в RAG‑систему, чтобы они стали доступны для retrieval.
2. Общая архитектура топологии
Типичный конвейер состоит из трёх основных топиков и нескольких consumer‑групп:
| Топик | Назначение | Партиционирование | Типичный retention |
|---|---|---|---|
documents.raw | Сырые документы из источника (файлы, API, веб‑скрапинг) | source_id (идентификатор источника) | 7–30 дней |
documents.chunks | Результаты парсинга и нарезки на чанки | document_id | 7 дней |
documents.errors / DLQ | Сообщения, которые не удалось обработать | не критично | 90 дней или бесконечно |
Поток данных:
Source → [documents.raw] → Consumer (parser + chunker) → [documents.chunks] → Consumer (embedder) → Vector DB
Каждый топик — commit log (журнал подтверждений), который позволяет нескольким consumer’ам читать одни и те же данные независимо.
3. Топик documents.raw
Структура сообщения
{
"source_id": "web_crawler_1",
"document_id": "uuid_v4",
"format": "pdf",
"content": "<base64 или ссылка на объектное хранилище>",
"metadata": {"url": "...", "timestamp": 1234567890}
}
Партиционирование по source_id важно для порядковой обработки (order preservation): все документы от одного источника попадают в одну партицию, что упрощает обнаружение дубликатов и обеспечивает FIFO внутри источника.
Параметры продюсера
- acks=all — подтверждение от всех реплик для избежания потери данных.
- enable.idempotence=true — исключает дубликаты при повторных отправках.
- batch.size и
linger.ms— настраиваются под ожидаемый throughput (например, 32 KB и 5 ms для low‑latency).
4. Consumer stage 1: Парсинг и чанкинг
Consumer подписан на documents.raw. Основные задачи:
- Загрузка документа — если content — ссылка на S3, скачать.
- Парсинг — извлечение текста, таблиц, изображений (PDF, HTML, Docx).
- Чанкинг — разбиение на фрагменты (например, 512 токенов с перекрытием 10%).
- Обогащение метаданными —
chunk_index,chunk_text,source_document_id.
Каждый чанк отправляется в documents.chunks как отдельное сообщение.
Партиционирование consumer’а
- Consumer не должен читать из documents.raw с произвольным распределением — используем assign с указанием конкретных партиций (по source_id) или group.id и subscribe с partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.
- Для сохранения порядка внутри одного документа все чанки одного документа должны попасть в одну партицию documents.chunks. Решение — ключ сообщения document_id при записи.
Обработка ошибок
- Если документ не парсится (битый PDF) → отправить в DLQ с причиной ошибки.
- Если чанк не получилось создать (превышен лимит размера) → отправить в DLQ или проигнорировать с логом.
5. Топик documents.chunks
Структура сообщения
{
"chunk_id": "uuid_v4",
"document_id": "uuid_v4",
"source_id": "web_crawler_1",
"chunk_index": 3,
"chunk_text": "...",
"metadata": {...}
}
Партиционирование по document_id гарантирует, что все чанки одного документа обрабатываются одним consumer’ом этапа эмбеддингов — это важно для атомарного обновления векторного индекса (удалить старые чанки, вставить новые для того же документа).
Параметры топика
- partitions — число, кратное количеству consumer’ов второго этапа (например, 6 партиций для 3 consumer’ов).
- replication.factor — минимум 3 для отказоустойчивости.
- min.insync.replicas = 2 для обеспечения доступности при отказе одной реплики.
6. Consumer stage 2: Генерация эмбеддингов и запись в векторную БД
Consumer читает documents.chunks и выполняет:
- Batch‑запрос к embedding‑модели (например, OpenAI Embeddings или sentence‑transformers).
- Запись в векторную базу (Pinecone, Weaviate, Qdrant, FAISS).
- Отправка подтверждения (commit offset) только после успешной записи — at‑least‑once или exactly‑once с идемпотентностью на стороне БД (idempotent upsert).
Блокировка повторной обработки
- Чанки уже обработанного документа могут быть переотправлены при ребалансировке. Используем upsert по
chunk_idв векторной БД (замена, если существует). - Для кейса обновления документа (документ изменён) — сначала удаляем все чанки с
document_id, затем вставляем новые.
7. Dead Letter Queue (DLQ)
DLQ — отдельный топик (documents.errors) для сообщений, которые consumer не смог обработать после нескольких retry.
Зачем
- Не допустить blocking (consumer застревает на одном плохом сообщении).
- Сохранить контекст для ручного или автоматического исправления.
Типичные причины попадания в DLQ
- Ошибка парсинга (битый документ).
- Ошибка эмбеддинг‑модели (лимит токенов превышен).
- Временная недоступность векторной БД (после исчерпания retry).
- Неверный формат данных.
Архитектура DLQ
- В consumer’е конфигурируем:
max.poll.records,max.poll.interval.ms, а также количество повторов (например, 3) перед отправкой в DLQ. - Для DLQ можно настроить отдельный consumer для мониторинга и автоматической переотправки после исправления ошибки (например, если проблема была временной).
8. Настройки производительности
Throughput vs Latency
| Параметр | Для высокой пропускной способности | Для низкой задержки (<5 с) |
|---|---|---|
batch.size | 1 MB | 16 KB |
linger.ms | 100–500 ms | 0–5 ms |
compression.type | snappy / zstd | none |
acks | all | all (потери недопустимы) |
max.in.flight.requests.per.connection | 5 | 1 (для порядка) |
Партиции
- Число партиций должно быть ≥ числа consumer’ов в группе.
- Для real‑time ingestion важно, чтобы максимальный lag между записью в
documents.rawи обновлением векторной БД был <5 секунд. Достигается малыми батчами и быстрыми consumer’ами.
Мониторинг
- Lag (отставание консьюмера) — метрика из
kafka-consumer-groups.sh. - Error rate — процент сообщений, ушедших в DLQ.
- Throughput — сообщений/с по каждому топику.
9. Обработка ошибок и идемпотентность
Exactly‑once semantics (EOS)
Включаем enable.idempotence=true у продюсера и isolation.level=read_committed у consumer. Но EOS часто избыточна для RAG ingestion — достаточно at‑least‑once с идемпотентным upsert в векторной БД.
Retry и backoff
Используем exponential backoff: retry.backoff.ms (100 ms) и reconnect.backoff.ms. Consumer может приостановить потребление из партиции (pause()) при временной ошибке векторной БД.
10. Real‑time ingestion: специфика
Для сценариев, когда документы поступают с высокой частотой (например, логи, новостные ленты), требуется:
- Streaming processing — Kafka Consumer API или Kafka Streams для лёгкой обработки.
- Минимальный batch — consumer может быть настроен на
fetch.min.bytes=1иfetch.max.wait.ms=10(но это увеличит число запросов). - Async producer — отправлять каждое сообщение с callback (чтобы не блокировать парсинг).
Альтернатива: использовать Kafka Connect для прямой записи из внешних источников (JDBC, S3).
11. Безопасность и управление схемами
- Schema Registry (с Avro, Protobuf или JSON Schema) — гарантирует совместимость форматов при обновлении кода.
- ACL (Access Control Lists) — ограничить доступ к топикам для разных команд.
- TLS / SASL — шифрование и аутентификация.
Пет-проект для закрепления
Задача: Реализовать простую Kafka‑топологию для RAG ingestion на локальной машине с помощью Docker.
Инструменты: Docker Compose (Kafka + Zookeeper), Python (kafka‑python, LangChain, sentence‑transformers), Qdrant (векторная БД).
Шаги:
- Поднять Kafka (один брокер, топики
documents.raw(3 партиции) иdocuments.chunks(3 партиции), DLQdocuments.errors). - Написать продюсер, который читает PDF из папки и отправляет в
documents.rawс ключом =source_id. - Написать consumer stage 1: парсит PDF (PyMuPDF), разбивает на чанки (RecursiveCharacterTextSplitter), публикует чанки в
documents.chunksс ключом =document_id. - Написать consumer stage 2: получает чанки, эмбеддит (all‑MiniLM‑L6‑v2), делает upsert в Qdrant.
- Добавить обработку ошибок: при ошибке парсинга — запись в DLQ.
- Замерить latency (время от отправки raw‑документа до появления его чанков в Qdrant).
Ожидаемый результат: Работающий конвейер, который загружает PDF, превращает их в чанки и индексирует в векторной БД с задержкой <10 секунд для документов до 100 страниц.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| Вопрос 7 | Уменьшение latency RAG-системы |
| Вопрос 1 | Проектирование RAG для 10 000 документов |
| Вопрос 3 | Стратегии chunking’а |
| Вопрос 12 | Self-RAG и обновление индекса |
| Вопрос 9 | Обновление документов в существующей RAG |
| Вопрос 15 | Мониторинг и логирование RAG |
Навигация
- Предыдущий: 427
- Следующий: 429
- Индекс: 00. Индекс разборов