Что такое Kafka compaction для логов LLM взаимодействий?

Краткий тезис

Kafka compaction — это механизм очистки логов в Apache Kafka, который сохраняет только последнее сообщение для каждого ключа, удаляя более старые версии. Для логов LLM взаимодействий это позволяет хранить актуальное состояние диалога или долгоживущего агента, используя ключ вида user_id:session_id. В контексте RAG|Agentic RAG compaction даёт возможность восстанавливать сессию агента после сбоя, экономить дисковое пространство и гарантировать, что агент работает с самой свежей историей.


1. Термин: Apache Kafka и log compaction

Apache Kafka — это распределённая платформа для потоковой передачи событий (event streaming). Kafka хранит данные в топиках (topics), которые разделены на партиции (partitions). Каждое сообщение состоит из ключа, значения, временной метки и смещения (offset). По умолчанию топики используют политику очистки delete — сообщения удаляются через заданное время (retention.ms) или при превышении размера (retention.bytes).

Log compaction (компактизация лога) — альтернативная policy=compact|cleanup.policy=compact|политика очистки]], при которой Kafka гарантирует, что для каждого ключа в партиции остаётся только последнее сообщение. Старые версии с тем же ключом фоновый процесс (cleaner) периодически удаляет. Компактизация не удаляет сообщения моментально; она работает в фоне, оставляя «хвост» (tail) из актуальных записей и «голову» (head) из недавних сообщений, которые ещё не были компактифицированы.

Ключевое отличие от delete: при compact данные теряются не по времени, а по дубликатам ключей. Это идеально для сценариев, где нужно хранить последнее известное состояние сущности, например, текущее состояние агента]].

Зачем Kafka compaction для логов LLM взаимодействий? В Agentic RAG агенты могут вести длинные диалоги, обрабатывать инструменты и запоминать контекст. Логи каждого шага (запрос пользователя, промежуточные рассуждения, вызовы инструментов, ответ) пишутся в Kafka топик. Если хранить все сообщения, топик будет неограниченно расти. С compaction мы храним для каждого user_id:session_id только последнюю полную историю диалога (или последнее состояние стека вызовов), экономя память.


2. Как compaction работает внутри Kafka

Фоновый процесс Log Cleaner сканирует компактифицируемые топики (cleanup.policy=compact). Он проходит по сегментам лога, собирает последние значения для каждого ключа и заменяет старые сегменты новыми, содержащими только эти последние записи.

Важные параметры:

  • log.cleaner.enable=true — включает cleaner (по умолчанию true).
  • log.cleanup.policy=compact — политика компактизации для топика.
  • log.cleaner.min.compaction.lag.ms — минимальное время перед компактизацией сообщения (чтобы дать консюмерам прочитать).
  • log.cleaner.io.max.bytes.per.second — скорость компактизации.
  • min.cleanable.dirty.ratio — коэффициент «грязных» данных, после которого начинается компактизация (по умолчанию 0.5).

При compaction сообщения с ключом, для которого последнее значение — null (tombstone record), полностью удаляются. Это позволяет удалять сессии, когда диалог завершён.

Ограничения: Кафка compaction гарантирует в конечном счёте (eventual consistency), что для каждого ключа останется последнее сообщение. Нельзя полагаться на compaction как на немедленную очистку; могут быть окна, когда несколько версий одного ключа сосуществуют.


3. Модель данных для логов LLM взаимодействий

Рекомендуемая схема:

  • Ключ (key) = {user_id}:{session_id} — строка, однозначно идентифицирующая сессию.
  • Значение (value) — сериализованное состояние диалога: может быть JSON, Protobuf, Avro.

Варианты значения:

  1. Полный дамп истории: массив сообщений (роль + контент) + метаданные (текущий инструмент, шаг, thread_id). Компактизация хранит последний полный дамп.
  2. Лог событий (аппенд-лог) — каждое сообщение добавляет одно событие (user message, tool call, agent response). При compaction останется только последнее событие, что недопустимо для восстановления всей истории. Поэтому лучше использовать compact+delete (двойная политика) или хранить полную историю в значении.

Рекомендуется value = полная история (list of messages + state). При каждом новом шаге агент перезаписывает ключ с обновлённым значением.

Почему не append-only Если мы пишем каждый шаг отдельным сообщением с одним и тем же ключом, compaction всё равно оставит только последнее — вся промежуточная информация потеряется. Поэтому нужно либо писать полный дамп, либо использовать отдельный топик с delete для лога событий и compact только для хранения последнего состояния.


4. Преимущества compaction для Agentic RAG

ПреимуществоОписание
Экономия местаХранится только последняя версия состояния для каждой сессии. Для миллионов сессий это критично.
Быстрое восстановлениеПри перезапуске агента читаем по ключу последнее состояние и продолжаем диалог.
Гарантия актуальностиКонсьюмер всегда получает самую свежую версию диалога.
ГибкостьМожно комбинировать с delete для временных логов (например, логи шагов).

Когда compaction не подходит

  • Если нужно хранить весь исторический лог событий (аудит). Тогда используйте отдельный топик с delete (удержание по времени).
  • Если ключи меняются слишком часто и не являются уникальными идентификаторами сессий (например, ключ — просто тип события).
  • Если требуется немедленная очистка (compaction — фоновый процесс с задержкой).

5. Сравнение политик очистки

ПолитикаПоведениеПример использования
deleteУдаление по времени/размеруЛоги запросов, метрики
compactХранит последнее сообщение по ключуСостояние сессии агента, кэш
compact,deleteСначала компактизация, потом удаление старых сегментов (по времени)Компромисс: и последнее состояние, и финальное удаление
noneНикогда не удаляет сообщенияТолько для отладки

Для логов LLM взаимодействий оптимально: основной топик состояния с compact, отдельный топик аудита с delete (retention 7-30 дней).


6. Практическая реализация: конфигурация топика

Пример конфигурации топика через Kafka CLI:

kafka-topics.sh --create --topic agent-sessions \
  --partitions 10 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.2 \
  --config segment.ms=600000
  • min.cleanable.dirty.ratio=0.2 — cleaner запустится, когда 20% лога станут «грязными» (содержат дубликаты ключей).
  • segment.ms=600000 — сегмент закрывается каждые 10 минут, ускоряя компактизацию.

В коде на Python с использованием confluent_kafka:

from confluent_kafka import Producer, Consumer, KafkaError
import json

# Producer: запись состояния сессии
producer = Producer({'bootstrap.servers': 'localhost:9092'})

session_key = "user123:session_abc"
state = {
    "messages": [{"role": "user", "content": "Привет"}, {"role": "assistant", "content": "Здравствуйте!"}],
    "current_tool": None,
    "last_updated": "2025-04-07T12:00:00Z"
}

producer.produce(topic="agent-sessions", 
                 key=session_key.encode('utf-8'),
                 value=json.dumps(state).encode('utf-8'),
                 callback=lambda err, msg: print(f"Delivered to {msg.partition()}"))
producer.flush()

# Consumer: чтение последнего состояния
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'agent-recovery',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
})
consumer.subscribe(['agent-sessions'])

# Ищем последнее сообщение для конкретного ключа
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            break
        else:
            print(f"Error: {msg.error()}")
            break
    if msg.key() == session_key.encode('utf-8'):
        state = json.loads(msg.value())
        # используем state
        break
consumer.close()

Важно: при compaction консьюмер должен читать с момента earliest, чтобы дождаться, пока cleaner обновит лог. Обычно читают последнее сообщение через хвост (tail) с использованием latest и таймаутом, или через KSQL.


7. Мониторинг компактизации

Метрики Kafka (JMX/MBeans):

  • kafka.log:type=LogCleaner,name=cleaner-time-per-cycle — время цикла.
  • kafka.log:type=LogCleaner,name=max-dirty-percent — максимальный процент «грязных» данных.
  • kafka.log:type=Log,name=LogSegmentCount,topic=agent-sessions — количество сегментов.

Если топик растёт быстрее, чем cleaner успевает чистить, увеличьте log.cleaner.io.max.bytes.per.second или уменьшите min.cleanable.dirty.ratio.


8. Альтернативы Kafka compaction

РешениеПлюсыМинусы
Redis (состояние по ключу)Низкая задержка, TTL, простотаОграниченная ёмкость, нет гарантий долговременного хранения, не распределённое логирование
PostgreSQL / ScyllaDBACID, сложная логика запросовБольше overhead, не stream-native
S3 + метаданныеДешёвое хранение, бесконечный срокВысокая задержка записи/чтения, не подходит для real-time
Kafka compactПотоковая обработка, отказоустойчивость, масштабированиеЗадержка компактизации, не мгновенное удаление

Выбор зависит от требований: для high-throughput real-time состояния агента — Kafka compact; для долгого хранения аудита — S3; для быстрого кэша — Redis.


9. Пет-проект для закрепления

Задача Разработать симуляцию агента, который пишет логи диалогов в Kafka топик с компактизацией, восстанавливает последнее состояние после перезапуска.

Инструменты

Шаги:

  1. Поднимите Kafka в Docker.
  2. Создайте топик agent-state с cleanup.policy=compact.
  3. Напишите продюсер: при каждом шаге агента (user input -> tool call -> response) генерируйте новое состояние и отправляйте сообщение с ключом user_id:session_id.
  4. Напишите консьюмер, который при запуске читает последнее состояние для сессии (можно через consumer.offsets_for_times() или просто поллингом).
  5. Имитируйте сбой: удалите топик? лучше перезапустите консьюмер и проверьте, что он восстанавливает последнее состояние.
  6. Проверьте, что компактизация действительно удаляет старые сообщения (через kafka-dump-log или JMX).

Ожидаемый результат После записи 100 сообщений с одним и тем же ключом, на диске останется только одно (последнее). При перезапуске консьюмер получает актуальную историю диалога.


10. Связь с другими вопросами

ВопросТема
415Что такое Agentic RAG и его основные компоненты?
416Как организовать память агента в Agentic RAG?
417Как управлять состоянием долгоживущих агентов?
418Что такое event-driven архитектура для агентов?
420Как инструменты (tools) вызываются в Agentic RAG?
423Как логировать и дебажить работу агента?

Компактизация напрямую связана с управлением состоянием (вопрос 417) и памятью (416). Она позволяет логировать взаимодействия (423) без переполнения хранилища.


Навигация