English translation is not available yet. Showing Russian content.

Как проектировать AI pipeline с at-least-once семантикой?

Краткий тезис

At-least-once семантика гарантирует, что каждое сообщение (событие, документ, запрос) в pipeline будет обработано хотя бы один раз. Это означает, что возможны дубликаты обработки, но исключены потери данных. Для AI-пайплайнов (например, ingestion документов в RAG) такая семантика критична, когда пропуск документа недопустим, а дубликаты можно устранить на этапе идемпотентной обработки. Ключевые компоненты: producer, который фиксирует отправку (offset|коммит offset после подтверждения), consumer, который обрабатывает сообщение до коммита, и dead letter queue (DLQ) для неудачных сообщений.


1. Термин: At-least-once семантика (как минимум однажды)

At-least-once — модель доставки сообщений в распределённых системах, при которой сообщение может быть доставлено более одного раза, но никогда не потеряно. Достигается за счёт повторных отправок и подтверждений.

Противопоставление

  • At-most-once — сообщение доставляется не более одного раза (возможна потеря).
  • Exactly-once — сообщение доставляется ровно один раз (наиболее сложная).

Почему это важно для AI pipeline:

  • Ingestion документов для RAG: если документ не проиндексирован, пользователь не получит ответ. Лучше дважды проиндексировать (и потом дедуплицировать), чем пропустить.
  • Логирование метрик: потеря события может исказить статистику.
  • Обработка агентных вызовов: потеря запроса к LLM может нарушить бизнес-логику.

2. Архитектура AI pipeline с at-least-once

Типовой pipeline включает этапы:

  • Поступление события (HTTP‑запрос, Kafka‑сообщение, файл)
  • Producer — компонент, отправляющий сообщение в очередь (например, Kafka, RabbitMQ, Redis Streams)
  • Consumer — обработчик, который получает сообщение, выполняет работу (например, chunks, эмбеддинг, запись в векторную БД) и подтверждает обработку
  • Dead Letter Queue (DLQ) — очередь для сообщений, которые не удалось обработать после нескольких попыток
  • Хранилище состояний (опционально) — для идемпотентности: таблица обработанных message_id
graph LR
    A[Producer] -->|publish| B(Broker/Kafka)
    B -->|poll| C[Consumer]
    C -->|process & commit| D[(Output: Vector DB, DB)]
    C -->|error after retries| E[DLQ]
    C -->|idempotency check| F[(State Store)]

Термин «Очередь сообщений» (Message Queue) — посредник между producer и consumer, обеспечивающий буферизацию, надёжную доставку и масштабирование.


3. Producer: гарантия отправки и offset commit

Producer отвечает за публикацию сообщения в очередь и получение подтверждения, что сообщение принято.

Ключевые настройки (на примере Kafka):

  • acks=all: подтверждение от всех реплик раздела (лидера и in-sync replicas). Это даёт максимальную гарантию, что сообщение не потеряно при сбое брокера.
  • retries и retry.backoff.ms: автоматическая повторная отправка при временной ошибке (например, NotLeaderForPartition).
  • enable.idempotence=true: producer гарантирует уникальность отправляемых сообщений (через field producerId + sequenceNumber), чтобы избежать дублирования на стороне брокера из-за повторных отправок.

Offset — смещение — уникальный номер, присваиваемый каждому сообщению в разделе (partition) очереди. Коммит offset означает: «все сообщения до этого смещения обработаны». Производитель фиксирует offset после отправки, но настоящую гарантию даёт коммит consumer.

Пример producer (Python, confluent_kafka):

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'retries': 3,
    'enable.idempotence': True
}
producer = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

producer.produce('documents', value=b'some doc', callback=delivery_report)
producer.flush()

Важно Producer не гарантирует, что consumer обработает сообщение. Producer гарантирует, что сообщение успешно сохранено в очереди.


4. Consumer: обработка и commit offset

Consumer получает сообщения (обычно в цикле), обрабатывает их и после успешной обработки коммитит offset, сообщая брокеру, что сообщение обработано.

Стратегия at-least-once

  1. Получить сообщение (poll)
  2. Обработать его (сделать индекс, записать в БД)
  3. Если успешно → коммит offset
  4. Если неуспешно → не коммитить, при следующем poll потребитель получит то же сообщение снова (перезапуск)

Термин «Auto-commit» — автоматический коммит offset через фиксированный интервал. Для at-least-once опасен: если consumer упадёт после auto‑commit, но до обработки сообщения, сообщение будет потеряно. Рекомендация отключить auto‑commit и коммитить вручную.

from confluent_kafka import Consumer, KafkaError

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'doc-ingestion',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['documents'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(f'Consumer error: {msg.error()}')
            break

    # Обработка сообщения (например, индексация)
    try:
        process_document(msg.value())
        # Коммит offset вручную после успеха
        consumer.commit()
    except Exception as e:
        # Логгируем ошибку, не коммитим -> сообщение будет прочитано снова
        print(f'Processing failed: {e}')
        # Можно отправить в DLQ после N попыток (см. раздел 6)

Потенциальная проблема Если обработка занимает больше времени, чем session.timeout.ms, consumer будет исключён из группы. Нужно увеличивать таймауты или использовать фоновый heartbeat.


5. Дубликаты: идемпотентность обработки

Главный недостаток at-least-once — дубликаты. Сообщение может быть обработано дважды, если:

  • Consumer упал после обработки, но до коммита offset → сообщение будет повторно доставлено.
  • Producer сделал повторную отправку (retry) из-за таймаута подтверждения.

Решение: идемпотентная обработка — обработка одного и того же сообщения не должна менять состояние системы более одного раза.

Способы реализации

  1. Таблица дедубликации (dedup table) в базе данных с уникальным ключом — message_id (генерируется producer).
CREATE TABLE processed_messages (
    message_id VARCHAR(64) PRIMARY KEY,
    processed_at TIMESTAMP DEFAULT NOW()
);

Перед обработкой: INSERT INTO processed_messages (message_id) VALUES (?) ON CONFLICT DO NOTHING. Если строка уже есть — пропускаем.

  1. Idempotent key в хранилище состояния (Redis, etcd) с TTL, чтобы не хранить бесконечно.

  2. Идемпотентные операции (например, индексация документа: UPSERT вместо INSERT в векторную БД). Если дважды добавить один и тот же текст – эмбеддинг будет одинаковым, но может появиться дубликат в результатах. Лучше проверять хеш содержимого.

Термин «Идемпотентность» — свойство операции: многократное выполнение даёт тот же результат, что и однократное.

def process_document_with_idempotency(msg_value, state_store):
    message_id = extract_message_id(msg_value)
    if state_store.get(message_id):
        # Already processed
        return
    # Process document
    index_document(msg_value)
    state_store.set(message_id, timestamp())

Важно Идемпотентность не устраняет дубликаты на уровне consumer'а (сообщение всё равно будет получено дважды), но делает их безвредными.


6. Dead Letter Queue (DLQ): обработка ошибок

Если после нескольких повторных попыток сообщение всё ещё не удаётся обработать (например, невалидный формат, ошибка эмбеддинга), его нужно перенести в DLQ, а не бесконечно ретраить.

Механизм

  1. Consumer пытается обработать сообщение.
  2. После max_retries неудачных попыток (например, 3 раза) сообщение отправляется в DLQ (другой Kafka‑топик, или очередь).
  3. Offset исходного сообщения коммитится (иначе цикл зациклится). Но дубликат уже попал в DLQ.
  4. Администраторы анализируют DLQ и принимают решение: исправить данные, переслать вручную или игнорировать.

Пример настройки retries

retry_count_key = f"retry_{msg.partition()}_{msg.offset()}"
retry_count = state_store.get(retry_count_key) or 0

try:
    process_document(msg.value())
    consumer.commit()
    state_store.delete(retry_count_key)
except Exception as e:
    retry_count += 1
    if retry_count >= MAX_RETRIES:
        # Send to DLQ
        send_to_dlq(msg.value(), error=str(e))
        consumer.commit()  # commit to skip failed message
        state_store.delete(retry_count_key)
    else:
        state_store.set(retry_count_key, retry_count)
        # Don't commit, message will be redelivered

Термин «DLQ» — очередь для сообщений, которые невозможно обработать; позволяет избежать потери данных и даёт возможность ручного анализа.


7. Мониторинг и метрики

Для at-least-once pipeline критично отслеживать:

  • Lag — разница между последним отправленным и последним обработанным offset. Рост lag указывает на проблемы consumer.
  • Retry rate — количество сообщений, обработанных с повторных попыток.
  • DLQ count — сколько сообщений упало в DLQ.
  • Duplicate ratio — доля дубликатов, найденных через идемпотентность (для оценки качества).
  • End-to-end latency — время от публикации до успешного коммита.

Инструменты: Prometheus + Grafana, встроенный мониторинг Kafka (JMX, Confluent Control Center).


8. Практический пример: Ingestion pipeline для RAG

Задача Надёжно загружать PDF‑документы в векторную базу данных (FAISS + PostgreSQL) с гарантией at‑least‑once, допуская дубликаты, которые потом устраняются через хеш содержимого.

Компоненты

  • Producer (API‑сервер): принимает файлы, генерирует message_id (UUID), отправляет в Kafka‑топик documents.
  • Kafka: с настройками acks=all, min.insync.replicas=2, топик с 3 партициями.
  • Consumer (worker): читает из documents, парсит PDF, чанкует, создаёт эмбеддинги, сохраняет хеш каждого чанка.
  • State store (Redis): хранит message_id обработанных сообщений (TTL 7 дней) и счётчики retry.
  • DLQ: топик documents_dlq для битых файлов.

Поток (at‑least‑once):

  1. API отправляет файл, сохраняет message_id в своём логе.
  2. Consumer получает сообщение. Сначала проверяет Redis: если message_id уже есть – пропускает (идемпотентность). Если нет – обрабатывает.
  3. После успеха коммитит offset и записывает message_id в Redis.
  4. Если ошибка – увеличивает счётчик retry. После 3 попыток – отправляет в DLQ и коммитит.

Пет-проект для закрепления

Задача Разработайте микросервис для индексации новостных статей в локальную векторную базу (FAISS или Chroma) с гарантией at‑least‑once.

Инструменты

Шаги:

  1. Настройте Kafka: топик articles (3 партиции, replication‑factor 1 для локального).
  2. Реализуйте producer: endpoint /ingest принимает {id, text}, генерирует message_id, отправляет в Kafka.
  3. Реализуйте consumer: бесконечный цикл, enable.auto.commit=False.
    • При получении сообщения проверяйте message_id в Redis.
    • Если новый – chankуйте текст, вычисляйте эмбеддинги через sentence‑transformers, сохраняйте в Chroma (upsert).
    • После успеха коммитьте offset и пишите в Redis SET message_id 1 EX 86400.
    • При ошибке: инкрементируйте счётчик retry в Redis. Если > 3 – отправьте в DLQ (другой топик) и коммитьте.
  4. Добавьте логи и мониторинг (простая печать в консоль или Prometheus client).
  5. Протестируйте: отправьте 100 статей, симулируйте падение consumer (убить процесс) – убедитесь, что после перезапуска обработка продолжается без потерь, и что дубликаты не ломают базу.

Ожидаемый результат

  • Рабочий pipeline, в котором ни одна статья не теряется (можно проверить по количеству записей в Chroma).
  • Дубликаты (например, из‑за перезапуска) не увеличивают размер базы (идемпотентный upsert).
  • Битые сообщения (невалидный JSON) попадают в DLQ.

Связь с другими вопросами

ВопросТема
821Как проектировать AI pipeline с exactly-once семантикой?
822Обработка ошибок в AI pipeline (retry, backoff, circuit breaker)
824Как проектировать AI pipeline с at-most-once семантикой?
825Сравнение гарантий доставки (at‑least‑once vs exactly‑once vs at‑most‑once)
830Idempotency в AI‑агентах и повторные вызовы LLM

Навигация