English translation is not available yet. Showing Russian content.

Что такое Kafka compaction для логов LLM взаимодействий?

Краткий тезис

Kafka log compaction — это механизм, при котором в топике Kafka для каждого ключа хранится только последнее сообщение. Применительно к логам LLM-взаимодействий это позволяет эффективно хранить полную историю диалога в компактном виде: ключом выступает идентификатор сессии (например, user_id:session_id), а значением — последнее состояние диалога. Таким образом, система может в любой момент восстановить актуальный контекст без хранения всех промежуточных сообщений.


1. Термин: Kafka topic и log compaction

Kafka — распределённая платформа для потоковой передачи событий. Основной единицей хранения является топик — упорядоченный, неизменяемый журнал сообщений. Каждое сообщение имеет ключ и значение.

Log compaction — один из политик очистки топика (наряду с delete). В компактифицированном топике Kafka периодически удаляет старые записи с одинаковым ключом, оставляя только самое последнее сообщение для каждого ключа. При этом все сообщения с уникальными ключами сохраняются целиком.

Ключевое отличие от обычного delete-ретента: в delete удаляются сообщения по времени или размеру, в compaction — по ключу, гарантируя, что для каждого ключа доступна только последняя версия.


2. Проблема: бесконечный рост логов LLM-взаимодействий

При разработке AI-агентов или чат-ботов на основе LLM необходимо логировать каждое взаимодействие:

В production такие логи могут расти экспоненциально. За сутки миллион диалогов порождает миллиарды записей. Хранить всё в топе Kafka с политикой delete неэффективно:

  • данные быстро устаревают (теряется история, если ретеншн мал);
  • если ретеншн велик, потребляется много хранилища;
  • для восстановления состояния конкретного диалога приходится сканировать весь топик.

3. Принцип работы compaction: удержание последнего значения по ключу

Топик с policy=compact|cleanup.policy=compact]] работает так:

  1. Каждое сообщение записывается с ключом.
  2. Kafka периодически запускает compaction — фоновый процесс, который сканирует сегменты топика.
  3. Для каждого ключа остаётся только последнее сообщение (по смещению).
  4. Старые дубликаты удаляются, освобождая место.

Таким образом, чтение топика с нуля даст только последние состояния для каждого ключа.


4. Выбор ключа: user_id:session_id или conversation_id

Для логов LLM-взаимодействий ключ должен однозначно идентифицировать диалог. Оптимальные варианты:

  • user_id:session_id — пользователь + номер сессии;
  • conversation_id — уникальный UUID диалога;
  • agent_id:session_id — для multi-agent систем.

Пример ключа: alice:08a7f12c. Значение — полный объект диалога (массив сообщений, метаданные). При каждом новом запросе-ответе продюсер отправляет сообщение с тем же ключом и обновлённым значением.


5. Конфигурация топика: cleanup.policy=compact

Для использования compaction в Kafka необходимо настроить топик:

bin/kafka-topics.sh --create \
  --topic llm-dialog-logs \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 2 \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=60000 \
  --config delete.retention.ms=86400000 \
  --config segment.ms=3600000

Параметры:

  • policy=compact|cleanup.policy=compact]] — включить compaction.
  • min.compaction.lag.ms=60000 — минимальная задержка перед компактификацией (чтобы дать время консюмерам обработать).
  • delete.retention.ms=86400000 — время хранения «удалённых» (устаревших) записей до физической очистки.
  • segment.ms=3600000 — размер сегмента (частота запуска compaction).

6. Сравнение с обычным delete retention

ХарактеристикаПолитика deleteПолитика compact
Что удаляетсяСообщения по времени/объёмуСтарые версии одного ключа
Гарантия для ключаНет последней версииВсегда есть последняя версия
Объём хранилищаФиксированный ретеншнЗависит от количества ключей
Подходит дляЛоги без привязки к ключуХранение последнего состояния
Пример в LLMСырые логи всех запросовСостояние каждого диалога

7. Преимущества compacted топика для логов LLM

  • Восстановление состояния диалога после перезапуска: консюмер может прочитать последнее сообщение для сессии и восстановить контекст.
  • Экономия места: вместо миллиона сообщений на один диалог хранится только последнее.
  • Простота мониторинга: можно быстро вычислить активные сессии, просмотрев ключи в топике.
  • Низкая задержка записи: продюсер не ждёт compaction, пишет сразу.
  • At-least-once гарантии: если продюсер дублирует, compact останавливает только дубли, но не теряет данные.

8. Недостатки и подводные камни

  • Потеря промежуточных шагов: если нужно отлаживать ошибку в середине диалога, промежуточных логов не будет (если не дублировать их отдельно).
  • Сложность управления ключами: если ключ не уникален (например, только user_id), то смешаются все сессии одного пользователя.
  • Нагрузка при compaction: процесс потребляет CPU и I/O, особенно при большом количестве ключей.
  • Задержка до применения compaction: по умолчанию compaction может происходить раз в несколько минут, поэтому последние изменения могут быть не сразу видны при чтении с earliest.

9. Альтернативы хранению состояния диалога

  • PostgreSQL с UPSERT: запись через INSERT ... ON CONFLICT (session_id) DO UPDATE. Достоинства: строгая консистентность, ACID. Недостатки: больше задержка записи, ограничение по пропускной способности.
  • Redis с TTL: сессии живут, например, 24 часа. Быстро, но нет гарантии долговременного хранения.
  • S3 с паркетами/avro: пакетная запись раз в минуту. Хорошо для аналитики, плохо для реального времени.
  • Таблица в ClickHouse с ReplacingMergeTree: может эффективно хранить последние записи по ключу.

10. Интеграция с LLM-системами: практический пример

Допустим, есть AI-агент, обрабатывающий запросы пользователей. Схема логгирования с compacted топиком:

  1. Продюсер (на Python с confluent-kafka):
from confluent_kafka import Producer
import json, uuid

p = Producer({'bootstrap.servers': 'localhost:9092'})

def log_dialog(session_id, user_msg, assistant_msg):
    key = f"session:{session_id}"
    value = {
        "session_id": session_id,
        "history": [],   # можно хранить всю историю
        "last_user_message": user_msg,
        "last_assistant_message": assistant_msg,
        "timestamp": int(time.time())
    }
    p.produce('llm-dialog-logs', key=key, value=json.dumps(value))
    p.flush()
  1. Консюмер для получения последнего состояния сессии:
from confluent_kafka import Consumer, KafkaError, TopicPartition

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'dialog-restore',
    'auto.offset.reset': 'earliest'
})
c.assign([TopicPartition('llm-dialog-logs', 0, 0)])
# Читаем последнее сообщение для конкретного ключа (можно через Kafka Streams, но упрощённо)

После перезапуска агент читает компактифицированный топик и восстанавливает все активные сессии.


11. Потенциальные подводные камни и best practices

  • Устанавливайте минимальный размер сегмента и задержку compaction (segment.bytes, min.compaction.lag.ms), чтобы избежать частого пересоздания сегментов.
  • Используйте compact,delete для гибридной политики: старые ключи удаляются по времени, а для активных остаётся последняя версия.
  • Мониторьте kafka-log-cleaner-manager: падение compaction может быстро забить диск.
  • Не используйте compaction для очень часто обновляемых ключей (тысячи раз в секунду) — это может создать большую нагрузку на cleaner.

Пет-проект для закрепления

Задача: Разработать систему логгирования диалогов многосессионного AI-агента, которая позволяет восстановить последний контекст после перезапуска.

Инструменты: Kafka (локально или через Confluent Cloud), Python 3.10+, confluent-kafka, библиотека openai для имитации LLM.

Шаги:

  1. Установите Kafka и создайте топик с cleanup.policy=compact (4 раздела).
  2. Напишите продюсера, который при каждом запросе к LLM отправляет сообщение с ключом session:{uuid} и актуальным JSON-объектом диалога.
  3. Напишите консюмера, который читает топик с auto.offset.reset=earliest и собирает последнее сообщение для каждой сессии (можно через словарь в памяти).
  4. Эмулируйте сценарий: несколько параллельных диалогов (10 сессий, по 50 сообщений каждая) — продюсер шлёт обновлённое состояние.
  5. После остановки продюсера запустите консюмер и проверьте, что для каждой сессии получено только последнее состояние.
  6. Сравните размер занятого места в топике с гипотетическим топиком delete (оцените через kafka-log-dirs).

Ожидаемый результат: Система, которая при перезапуске консюмера (или самого агента) позволяет восстановить последнее известное состояние каждого диалога без дополнительного хранилища, экономя в 10-100 раз место по сравнению с полным логом.


Связь с другими вопросами

ВопросТема
250Стриминг ответов LLM через Kafka
251Роль Kafka в архитектуре Agentic RAG
253Управление состоянием (state) AI-агента
254Логирование и мониторинг LLM-запросов
260Инструменты мониторинга RAG-системы
270Балансировка нагрузки и обработка backpressure

Навигация