Спроектировать structured logging для LLM
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Спроектировать structured logging для LLM
1. Цель задачи
Научиться проектировать и внедрять structured logging (структурированное логирование) для сервиса, использующего большие языковые модели (LLM). Создать систему генерации JSON-логов с обязательными полями (trace_id, user_id, model, latency) и организовать их хранение/поиск таким образом, чтобы любой запрос по произвольному полю выполнялся за секунды.
Ключевой результат Рабочий пайплайн structured logging, который позволяет в реальном времени фильтровать логи по любому из полей и получать результат за <2 секунд при объёме до 1 млн записей.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Python-сервис с LLM (например, инференс через OpenAI API или локальную модель) | Собственный pet-проект или заглушка (см. симуляцию) |
| Инфраструктура для сбора логов (Elasticsearch / Loki + S3 / ClickHouse / и т.п.) | Локальный Docker-compose или облачный сервис (бесплатный tier) |
| Генератор нагрузки (fake LLM-запросов) | locust или k6, либо простой скрипт на asyncio |
| Инструмент анализа логов | kibana / grafana / python + pandas для проверки |
Если нет реального инструмента — симулируем:
- Сервис-заглушка Напишите простой FastAPI-сервер с эндпоинтом
/chat, который имитирует LLM (задержка 0.5–5 с random, возвращает фиктивный ответ). - Хранилище логов Поднимите Elasticsearch 8.x через
docker-compose(одна нода) — это покрывает 99% потребностей для учебной задачи. - Нагрузка Скрипт на Python, который отправляет 100–500 запросов с разными
user_idиmodelк вашему сервису.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| LLM-сервис | FastAPI / Flask + OpenAI / локальная модель (заглушка) | Генерация логов |
| Structured logging | Python structlog + python-json-logger | Формирование JSON-логов |
| Хранилище | Elasticsearch 8.x (или OpenSearch) | Индексация и поиск по полям |
| Визуализация | Kibana (встроена в ES) | Дашборды и ad-hoc поиск |
| Нагрузочное тестирование | Locust / k6 / скрипт concurrent.futures | Симуляция трафика |
| Мониторинг скорости поиска | Kibana Dev Tools / Elasticsearch _count, _search API + тайминг | Измерение latency поиска |
4. Этапы выполнения
Этап 1: Интеграция structured logging в LLM-сервис (оценка времени: 1.5 часа)
Действия
- Установите зависимости
# requirements.txt fastapi uvicorn structlog python-json-logger elasticsearch (или opensearch-py) - Настройте
structlogв файлеlogging_config.py: - Добавьте middleware в FastAPI, которая:
- В начале запроса генерирует
trace_id(uuid4), читаетuser_idиз заголовка (или токена),modelиз тела запроса. - Передаёт эти поля в контекст через
structlog.threadlocal.bind_context(). - Замеряет время выполнения LLM-вызова и записывает
latency. - При завершении запроса вызывает
structlog.get_logger()и логирует событие"llm.request"с полями:trace_id,user_id,model,latency,status_code,prompt_length. - Очищает контекст через
structlog.threadlocal.clear_context().
- В начале запроса генерирует
- Проверьте Запустите сервис, отправьте тестовый запрос. Лог должен появиться в stdout как JSON-строка:
{"event": "llm.request", "timestamp": "2025-03-15T10:00:00Z", "logger": "app", "level": "info", "trace_id": "abc-123", "user_id": "u42", "model": "gpt-4", "latency": 2.34, "status_code": 200, "prompt_length": 150}
Ожидаемый результат этапа FastAPI-сервис выводит в stdout корректные JSON-логи с заданными полями.
Этап 2: Настройка Elasticsearch и передача логов (оценка времени: 1 час)
Действия
- Поднимите Elasticsearch 8.x через docker-compose:
version: '3' services: es: image: docker.elastic.co/elasticsearch/elasticsearch:8.15.0 environment: - discovery.type=single-node - xpack.security.enabled=false - ES_JAVA_OPTS=-Xms512m -Xmx512m ports: - "9200:9200" volumes: - es_data:/usr/share/elasticsearch/data volumes: es_data: - Напишите асинхронный отправитель логов (log handler), который буферизирует логи и отправляет их в Elasticsearch через bulk API.
- Используйте
elasticsearch_asyncилиaiohttp. - Настройте размер батча: 100 записей или интервал 5 секунд.
- Индекс:
llm-logs-2025.03.15(с суффиксом даты). - Маппинг должен быть динамическим, но для полей
latencyукажите"type": "float", дляtrace_id—"keyword", дляtimestamp—"date".
- Используйте
- Интегрируйте log handler в
structlog: переопределитеlogger_factoryили используйте стандартныйloggingс кастомным handler. - Запустите генератор нагрузки (100 запросов) и убедитесь, что данные появились в ES:
curl -X GET "localhost:9200/llm-logs-*/_count"
Ожидаемый результат этапа Все логи успешно индексируются в Elasticsearch, можно выполнить поиск по любому полю через Dev Tools.
Этап 3: Оптимизация поиска и измерение latency (оценка времени: 1.5 часа)
Действия
- Создайте индекс с настроенным анализом (через Kibana Dev Tools):
- Поле
prompt_lengthсделайте"type": "integer". - Поле
user_id—"keyword". - Добавьте
index.sort.fieldдля сортировки по@timestamp(при загрузке данных). - Включите
"index.mapping.total_fields.limit": 200(на будущее).
- Поле
- Напишите скрипт проверки latency поиска для каждого из полей (
trace_id,user_id,model,latency) выполните_searchс точным совпадением (term query) и замерьтеtook(в миллисекундах) из ответа Elasticsearch.import time, json from elasticsearch import Elasticsearch es = Elasticsearch("http://localhost:9200") queries = [ {"term": {"user_id": "u42"}}, {"term": {"model": "gpt-4"}}, {"range": {"latency": {"gte": 2.0}}}, {"term": {"trace_id": "abc-123"}} ] for q in queries: start = time.time() res = es.search(index="llm-logs-*", query=q, size=1) elapsed = (time.time() - start) * 1000 print(f"{q} -> {elapsed:.1f}ms, total hits: {res['hits']['total']['value']}") - Добейтесь, чтобы все запросы выполнялись < 200 мс (на 100k записях). Если медленно — добавьте явные mapping для полей, увеличьте количество шардов (для теста 1-2), отключите
_source(если не нужно). - Увеличьте нагрузку до 500k–1M записей (запустите скрипт повторно с большим числом запросов) и повторно выполните замеры. Убедитесь, что поиск по любому полю занимает < 2 секунд.
Ожидаемый результат этапа Документация (в комментариях кода или README) с результатами замеров latency до и после оптимизации.
Этап 4: Настройка визуализации и алертинга (оценка времени: 1 час)
Действия
- Создайте дашборд в Kibana
- Настройте простой алерт например, если средняя latency за последние 5 минут > 4 секунд, отправить уведомление в Telegram/Slack (можно через webhook). Используйте Kibana Alerting (требуется лицензия basic) или ELK Stack с ElastAlert.
- Проверьте откройте дашборд, выполните ручной поиск
user_id: u42— результат должен появиться мгновенно.
Ожидаемый результат этапа Рабочий дашборд Kibana с фильтрацией по полям и алерт (хотя бы базовый логгер предупреждений).
Этап 5: Документирование и воспроизводимость (оценка времени: 0.5 часа)
Действия
- Напишите README проекта, включающий:
- Как развернуть:
docker-compose up -d+pip install -r requirements.txt. - Примеры structured логов.
- Команды для нагрузочного теста (например,
python load_test.py --count 1000 --concurrency 10). - Скриншоты дашборда.
- Как развернуть:
- Упакуйте код в репозиторий (Git), убедитесь, что
docker-compose.yml,requirements.txt, конфиги структурированы. - Создайте Makefile или
justfileдля типовых команд:make up,make test,make logs.
Ожидаемый результат этапа README с инструкциями, репозиторий готов к передаче.
5. Критерии приемки (Definition of Done)
- Сервис логирует каждый LLM-запрос в формате JSON c полями
trace_id,user_id,model,latency,timestamp,status_code. - Логи асинхронно отправляются в Elasticsearch (или аналог) без блокировки основного потока.
- Поиск по любому из полей через Elasticsearch API (term/range) возвращает результат за < 2 секунд при 1M записей.
- Дашборд Kibana позволяет фильтровать по
user_id,model, диапазонуlatency— обновление данных не дольше 5 секунд после записи. - Настроен алерт на аномально высокую latency (>95 перцентиль).
- Репозиторий содержит полную документацию по развёртыванию в 1 команду (
docker-compose up). - Код покрыт хотя бы юнит-тестами на формат лога (проверка JSON schema).
- Нагрузочное тестирование (1000 запросов) не приводит к потере логов (count в ES равен числу отправленных).
6. Ожидаемый результат
Основной артефакт
- Папка проекта с файлами:
app/main.py— FastAPI-сервисapp/logging_setup.py— настройка structlog + Elasticsearch handlerdocker-compose.yml— ES + Kibanaload_test.py— скрипт генерации нагрузкиtest_logging.py— юнит-тестыREADME.md— документация
Дополнительно
- Дашборд Kibana экспортированный в NDJSON (для импорта).
- Конфигурация алерта (например,
elastalert_rule.yaml). - Отчёт о замерах latency поиска (таблица).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Elasticsearch не принимает логи из-за несовместимости маппинга | Удалить старый индекс (DELETE llm-logs-*) и пересоздать с правильным mapping (через Kibana Dev Tools). |
structlog конфликтует с logging из FastAPI | Использовать structlog как единственный логгер, через configure(processors=[...], wrapper_class=BoundLogger) и передать в uvicorn настройку log_config=None или кастомный dict. |
Поиск по latency медленный из-за текстового поля | Убедиться, что в mapping latency указан как float (не text). Для диапазонов использовать range запрос. |
| Потеря логов при высокой нагрузке | Увеличить размер буфера (до 500 записей) и использовать пакетную отправку с интервалом 1 секунда. Включить retry с exponential backoff. |
| Медленный поиск при больших объёмах (>10M) | Создать индекс с сортировкой по @timestamp (при старте). Добавить index.routing.allocation.total_shards_per_node и увеличить число шардов до 3–5. Рассмотреть использование time-based индексов. |
| Нет возможности использовать Elasticsearch | Заменить на ClickHouse (движок MergeTree с ORDER BY (timestamp, user_id)) — быстрый поиск по ключевым полям. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Интеграция structured logging в LLM-сервис | 1.5 часа |
| Этап 2: Настройка Elasticsearch и передача логов | 1 час |
| Этап 3: Оптимизация поиска и измерение latency | 1.5 часа |
| Этап 4: Настройка визуализации и алертинга | 1 час |
| Этап 5: Документирование и воспроизводимость | 0.5 часа |
| Итого | 5.5 часов |
Примечание При первом выполнении задачи возможны дополнительные 1–2 часа на изучение structlog и Elasticsearch. Если используется заглушка (симуляция) — время уменьшается на 0.5–1 час.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 3 | Что такое structured logging и чем он отличается от unstructured? |
| 7 | Как передать контекст (trace_id) между микросервисами? |
| 15 | Какие поля обязательны в логах для observability LLM-систем? |
| 22 | Как организовать асинхронную отправку логов в Elasticsearch? |
| 28 | Как измерять latency LLM-вызова (включая время сети)? |
| 31 | Какие индексы и маппинги оптимальны для поиска по keyword-полям? |
| 42 | Как настроить дашборд в Kibana для мониторинга LLM сервиса? |
| 55 | Как реализовать алерт на основе перцентилей latency? |
| 89 | Как проводить нагрузочное тестирование LLM-endpoint? |
| 101 | Какие метрики качества retrieval могут быть продублированы в логах? |
10. Чек-лист самопроверки
- Я проверил, что логи на этапе 1 выводятся в stdout в формате JSON с нужными полями.
- Я убедился, что после отправки 100 запросов все логи попали в ES (count совпадает).
- Я выполнил замеры поиска по каждому из полей и получил значения < 200 мс (до оптимизации) и < 2 с (после оптимизации на 1M записей).
- Я создал дашборд в Kibana и проверил, что фильтры по
user_id,model,latencyработают мгновенно. - Я написал хотя бы один unit-тест, который проверяет структуру лога (наличие trace_id, корректный тип latency).
- Я задокументировал процесс запуска в README так, чтобы его мог воспроизвести другой инженер.