Что такое Kafka compaction для логов LLM взаимодействий?
Краткий тезис
Kafka compaction — это механизм очистки логов в Apache Kafka, который сохраняет только последнее сообщение для каждого ключа, удаляя более старые версии. Для логов LLM взаимодействий это позволяет хранить актуальное состояние диалога или долгоживущего агента, используя ключ вида user_id:session_id. В контексте RAG|Agentic RAG compaction даёт возможность восстанавливать сессию агента после сбоя, экономить дисковое пространство и гарантировать, что агент работает с самой свежей историей.
1. Термин: Apache Kafka и log compaction
Apache Kafka — это распределённая платформа для потоковой передачи событий (event streaming). Kafka хранит данные в топиках (topics), которые разделены на партиции (partitions). Каждое сообщение состоит из ключа, значения, временной метки и смещения (offset). По умолчанию топики используют политику очистки delete — сообщения удаляются через заданное время (retention.ms) или при превышении размера (retention.bytes).
Log compaction (компактизация лога) — альтернативная policy=compact|cleanup.policy=compact|политика очистки]], при которой Kafka гарантирует, что для каждого ключа в партиции остаётся только последнее сообщение. Старые версии с тем же ключом фоновый процесс (cleaner) периодически удаляет. Компактизация не удаляет сообщения моментально; она работает в фоне, оставляя «хвост» (tail) из актуальных записей и «голову» (head) из недавних сообщений, которые ещё не были компактифицированы.
Ключевое отличие от delete: при compact данные теряются не по времени, а по дубликатам ключей. Это идеально для сценариев, где нужно хранить последнее известное состояние сущности, например, текущее состояние агента]].
Зачем Kafka compaction для логов LLM взаимодействий? В Agentic RAG агенты могут вести длинные диалоги, обрабатывать инструменты и запоминать контекст. Логи каждого шага (запрос пользователя, промежуточные рассуждения, вызовы инструментов, ответ) пишутся в Kafka топик. Если хранить все сообщения, топик будет неограниченно расти. С compaction мы храним для каждого user_id:session_id только последнюю полную историю диалога (или последнее состояние стека вызовов), экономя память.
2. Как compaction работает внутри Kafka
Фоновый процесс Log Cleaner сканирует компактифицируемые топики (cleanup.policy=compact). Он проходит по сегментам лога, собирает последние значения для каждого ключа и заменяет старые сегменты новыми, содержащими только эти последние записи.
Важные параметры:
- log.cleaner.enable=true — включает cleaner (по умолчанию true).
- log.cleanup.policy=compact — политика компактизации для топика.
- log.cleaner.min.compaction.lag.ms — минимальное время перед компактизацией сообщения (чтобы дать консюмерам прочитать).
- log.cleaner.io.max.bytes.per.second — скорость компактизации.
min.cleanable.dirty.ratio— коэффициент «грязных» данных, после которого начинается компактизация (по умолчанию 0.5).
При compaction сообщения с ключом, для которого последнее значение — null (tombstone record), полностью удаляются. Это позволяет удалять сессии, когда диалог завершён.
Ограничения: Кафка compaction гарантирует в конечном счёте (eventual consistency), что для каждого ключа останется последнее сообщение. Нельзя полагаться на compaction как на немедленную очистку; могут быть окна, когда несколько версий одного ключа сосуществуют.
3. Модель данных для логов LLM взаимодействий
Рекомендуемая схема:
- Ключ (key) = {user_id}:{session_id} — строка, однозначно идентифицирующая сессию.
- Значение (value) — сериализованное состояние диалога: может быть JSON, Protobuf, Avro.
Варианты значения:
- Полный дамп истории: массив сообщений (роль + контент) + метаданные (текущий инструмент, шаг, thread_id). Компактизация хранит последний полный дамп.
- Лог событий (аппенд-лог) — каждое сообщение добавляет одно событие (user message, tool call, agent response). При compaction останется только последнее событие, что недопустимо для восстановления всей истории. Поэтому лучше использовать
compact+delete(двойная политика) или хранить полную историю в значении.
Рекомендуется value = полная история (list of messages + state). При каждом новом шаге агент перезаписывает ключ с обновлённым значением.
Почему не append-only Если мы пишем каждый шаг отдельным сообщением с одним и тем же ключом, compaction всё равно оставит только последнее — вся промежуточная информация потеряется. Поэтому нужно либо писать полный дамп, либо использовать отдельный топик с delete для лога событий и compact только для хранения последнего состояния.
4. Преимущества compaction для Agentic RAG
| Преимущество | Описание |
|---|---|
| Экономия места | Хранится только последняя версия состояния для каждой сессии. Для миллионов сессий это критично. |
| Быстрое восстановление | При перезапуске агента читаем по ключу последнее состояние и продолжаем диалог. |
| Гарантия актуальности | Консьюмер всегда получает самую свежую версию диалога. |
| Гибкость | Можно комбинировать с delete для временных логов (например, логи шагов). |
Когда compaction не подходит
- Если нужно хранить весь исторический лог событий (аудит). Тогда используйте отдельный топик с
delete(удержание по времени). - Если ключи меняются слишком часто и не являются уникальными идентификаторами сессий (например, ключ — просто тип события).
- Если требуется немедленная очистка (compaction — фоновый процесс с задержкой).
5. Сравнение политик очистки
| Политика | Поведение | Пример использования |
|---|---|---|
delete | Удаление по времени/размеру | Логи запросов, метрики |
compact | Хранит последнее сообщение по ключу | Состояние сессии агента, кэш |
compact,delete | Сначала компактизация, потом удаление старых сегментов (по времени) | Компромисс: и последнее состояние, и финальное удаление |
none | Никогда не удаляет сообщения | Только для отладки |
Для логов LLM взаимодействий оптимально: основной топик состояния с compact, отдельный топик аудита с delete (retention 7-30 дней).
6. Практическая реализация: конфигурация топика
Пример конфигурации топика через Kafka CLI:
kafka-topics.sh --create --topic agent-sessions \
--partitions 10 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.2 \
--config segment.ms=600000
min.cleanable.dirty.ratio=0.2— cleaner запустится, когда 20% лога станут «грязными» (содержат дубликаты ключей).segment.ms=600000— сегмент закрывается каждые 10 минут, ускоряя компактизацию.
В коде на Python с использованием confluent_kafka:
from confluent_kafka import Producer, Consumer, KafkaError
import json
# Producer: запись состояния сессии
producer = Producer({'bootstrap.servers': 'localhost:9092'})
session_key = "user123:session_abc"
state = {
"messages": [{"role": "user", "content": "Привет"}, {"role": "assistant", "content": "Здравствуйте!"}],
"current_tool": None,
"last_updated": "2025-04-07T12:00:00Z"
}
producer.produce(topic="agent-sessions",
key=session_key.encode('utf-8'),
value=json.dumps(state).encode('utf-8'),
callback=lambda err, msg: print(f"Delivered to {msg.partition()}"))
producer.flush()
# Consumer: чтение последнего состояния
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'agent-recovery',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['agent-sessions'])
# Ищем последнее сообщение для конкретного ключа
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
break
else:
print(f"Error: {msg.error()}")
break
if msg.key() == session_key.encode('utf-8'):
state = json.loads(msg.value())
# используем state
break
consumer.close()
Важно: при compaction консьюмер должен читать с момента earliest, чтобы дождаться, пока cleaner обновит лог. Обычно читают последнее сообщение через хвост (tail) с использованием latest и таймаутом, или через KSQL.
7. Мониторинг компактизации
Метрики Kafka (JMX/MBeans):
kafka.log:type=LogCleaner,name=cleaner-time-per-cycle— время цикла.kafka.log:type=LogCleaner,name=max-dirty-percent— максимальный процент «грязных» данных.kafka.log:type=Log,name=LogSegmentCount,topic=agent-sessions— количество сегментов.
Если топик растёт быстрее, чем cleaner успевает чистить, увеличьте log.cleaner.io.max.bytes.per.second или уменьшите min.cleanable.dirty.ratio.
8. Альтернативы Kafka compaction
| Решение | Плюсы | Минусы |
|---|---|---|
| Redis (состояние по ключу) | Низкая задержка, TTL, простота | Ограниченная ёмкость, нет гарантий долговременного хранения, не распределённое логирование |
| PostgreSQL / ScyllaDB | ACID, сложная логика запросов | Больше overhead, не stream-native |
| S3 + метаданные | Дешёвое хранение, бесконечный срок | Высокая задержка записи/чтения, не подходит для real-time |
| Kafka compact | Потоковая обработка, отказоустойчивость, масштабирование | Задержка компактизации, не мгновенное удаление |
Выбор зависит от требований: для high-throughput real-time состояния агента — Kafka compact; для долгого хранения аудита — S3; для быстрого кэша — Redis.
9. Пет-проект для закрепления
Задача Разработать симуляцию агента, который пишет логи диалогов в Kafka топик с компактизацией, восстанавливает последнее состояние после перезапуска.
Инструменты
- Docker Compose (Kafka + Zookeeper)
- Python (confluent_kafka)
- Jupyter Notebook для демонстрации
Шаги:
- Поднимите Kafka в Docker.
- Создайте топик
agent-stateсcleanup.policy=compact. - Напишите продюсер: при каждом шаге агента (user input -> tool call -> response) генерируйте новое состояние и отправляйте сообщение с ключом
user_id:session_id. - Напишите консьюмер, который при запуске читает последнее состояние для сессии (можно через
consumer.offsets_for_times()или просто поллингом). - Имитируйте сбой: удалите топик? лучше перезапустите консьюмер и проверьте, что он восстанавливает последнее состояние.
- Проверьте, что компактизация действительно удаляет старые сообщения (через
kafka-dump-logили JMX).
Ожидаемый результат После записи 100 сообщений с одним и тем же ключом, на диске останется только одно (последнее). При перезапуске консьюмер получает актуальную историю диалога.
10. Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 415 | Что такое Agentic RAG и его основные компоненты? |
| 416 | Как организовать память агента в Agentic RAG? |
| 417 | Как управлять состоянием долгоживущих агентов? |
| 418 | Что такое event-driven архитектура для агентов? |
| 420 | Как инструменты (tools) вызываются в Agentic RAG? |
| 423 | Как логировать и дебажить работу агента? |
Компактизация напрямую связана с управлением состоянием (вопрос 417) и памятью (416). Она позволяет логировать взаимодействия (423) без переполнения хранилища.
Навигация
- Предыдущий: 418
- Следующий: 420
- Индекс: 00. Индекс разборов