Спроектировать structured logging для LLM

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Спроектировать structured logging для LLM

1. Цель задачи

Научиться проектировать и внедрять structured logging (структурированное логирование) для сервиса, использующего большие языковые модели (LLM). Создать систему генерации JSON-логов с обязательными полями (trace_id, user_id, model, latency) и организовать их хранение/поиск таким образом, чтобы любой запрос по произвольному полю выполнялся за секунды.

Ключевой результат Рабочий пайплайн structured logging, который позволяет в реальном времени фильтровать логи по любому из полей и получать результат за <2 секунд при объёме до 1 млн записей.

2. Исходные данные

Что нужноОткуда взять
Python-сервис с LLM (например, инференс через OpenAI API или локальную модель)Собственный pet-проект или заглушка (см. симуляцию)
Инфраструктура для сбора логов (Elasticsearch / Loki + S3 / ClickHouse / и т.п.)Локальный Docker-compose или облачный сервис (бесплатный tier)
Генератор нагрузки (fake LLM-запросов)locust или k6, либо простой скрипт на asyncio
Инструмент анализа логовkibana / grafana / python + pandas для проверки

Если нет реального инструмента — симулируем:

  1. Сервис-заглушка Напишите простой FastAPI-сервер с эндпоинтом /chat, который имитирует LLM (задержка 0.5–5 с random, возвращает фиктивный ответ).
  2. Хранилище логов Поднимите Elasticsearch 8.x через docker-compose (одна нода) — это покрывает 99% потребностей для учебной задачи.
  3. Нагрузка Скрипт на Python, который отправляет 100–500 запросов с разными user_id и model к вашему сервису.

3. Технологический стек

КомпонентИнструментыНазначение
LLM-сервисFastAPI / Flask + OpenAI / локальная модель (заглушка)Генерация логов
Structured loggingPython structlog + python-json-loggerФормирование JSON-логов
ХранилищеElasticsearch 8.x (или OpenSearch)Индексация и поиск по полям
ВизуализацияKibana (встроена в ES)Дашборды и ad-hoc поиск
Нагрузочное тестированиеLocust / k6 / скрипт concurrent.futuresСимуляция трафика
Мониторинг скорости поискаKibana Dev Tools / Elasticsearch _count, _search API + таймингИзмерение latency поиска

4. Этапы выполнения

Этап 1: Интеграция structured logging в LLM-сервис (оценка времени: 1.5 часа)

Действия

  1. Установите зависимости
    # requirements.txt
    fastapi
    uvicorn
    structlog
    python-json-logger
    elasticsearch (или opensearch-py)
    
  2. Настройте structlog в файле logging_config.py:
    • Определите процессоры: добавить timestamp, сгладить исключения, сериализовать в JSON.
    • Установите формат вывода: json_logs (через structlog.stdlib.JSONRenderer).
  3. Добавьте middleware в FastAPI, которая:
    • В начале запроса генерирует trace_id (uuid4), читает user_id из заголовка (или токена), model из тела запроса.
    • Передаёт эти поля в контекст через structlog.threadlocal.bind_context().
    • Замеряет время выполнения LLM-вызова и записывает latency.
    • При завершении запроса вызывает structlog.get_logger() и логирует событие "llm.request" с полями: trace_id, user_id, model, latency, status_code, prompt_length.
    • Очищает контекст через structlog.threadlocal.clear_context().
  4. Проверьте Запустите сервис, отправьте тестовый запрос. Лог должен появиться в stdout как JSON-строка:
    {"event": "llm.request", "timestamp": "2025-03-15T10:00:00Z", "logger": "app", "level": "info", "trace_id": "abc-123", "user_id": "u42", "model": "gpt-4", "latency": 2.34, "status_code": 200, "prompt_length": 150}
    

Ожидаемый результат этапа FastAPI-сервис выводит в stdout корректные JSON-логи с заданными полями.

Этап 2: Настройка Elasticsearch и передача логов (оценка времени: 1 час)

Действия

  1. Поднимите Elasticsearch 8.x через docker-compose:
    version: '3'
    services:
      es:
        image: docker.elastic.co/elasticsearch/elasticsearch:8.15.0
        environment:
          - discovery.type=single-node
          - xpack.security.enabled=false
          - ES_JAVA_OPTS=-Xms512m -Xmx512m
        ports:
          - "9200:9200"
        volumes:
          - es_data:/usr/share/elasticsearch/data
    volumes:
      es_data:
    
  2. Напишите асинхронный отправитель логов (log handler), который буферизирует логи и отправляет их в Elasticsearch через bulk API.
    • Используйте elasticsearch_async или aiohttp.
    • Настройте размер батча: 100 записей или интервал 5 секунд.
    • Индекс: llm-logs-2025.03.15 (с суффиксом даты).
    • Маппинг должен быть динамическим, но для полей latency укажите "type": "float", для trace_id"keyword", для timestamp"date".
  3. Интегрируйте log handler в structlog: переопределите logger_factory или используйте стандартный logging с кастомным handler.
  4. Запустите генератор нагрузки (100 запросов) и убедитесь, что данные появились в ES:
    curl -X GET "localhost:9200/llm-logs-*/_count"
    

Ожидаемый результат этапа Все логи успешно индексируются в Elasticsearch, можно выполнить поиск по любому полю через Dev Tools.

Этап 3: Оптимизация поиска и измерение latency (оценка времени: 1.5 часа)

Действия

  1. Создайте индекс с настроенным анализом (через Kibana Dev Tools):
    • Поле prompt_length сделайте "type": "integer".
    • Поле user_id"keyword".
    • Добавьте index.sort.field для сортировки по @timestamp (при загрузке данных).
    • Включите "index.mapping.total_fields.limit": 200 (на будущее).
  2. Напишите скрипт проверки latency поиска для каждого из полей (trace_id, user_id, model, latency) выполните _search с точным совпадением (term query) и замерьте took (в миллисекундах) из ответа Elasticsearch.
    import time, json
    from elasticsearch import Elasticsearch
    es = Elasticsearch("http://localhost:9200")
    queries = [
        {"term": {"user_id": "u42"}},
        {"term": {"model": "gpt-4"}},
        {"range": {"latency": {"gte": 2.0}}},
        {"term": {"trace_id": "abc-123"}}
    ]
    for q in queries:
        start = time.time()
        res = es.search(index="llm-logs-*", query=q, size=1)
        elapsed = (time.time() - start) * 1000
        print(f"{q} -> {elapsed:.1f}ms, total hits: {res['hits']['total']['value']}")
    
  3. Добейтесь, чтобы все запросы выполнялись < 200 мс (на 100k записях). Если медленно — добавьте явные mapping для полей, увеличьте количество шардов (для теста 1-2), отключите _source (если не нужно).
  4. Увеличьте нагрузку до 500k–1M записей (запустите скрипт повторно с большим числом запросов) и повторно выполните замеры. Убедитесь, что поиск по любому полю занимает < 2 секунд.

Ожидаемый результат этапа Документация (в комментариях кода или README) с результатами замеров latency до и после оптимизации.

Этап 4: Настройка визуализации и алертинга (оценка времени: 1 час)

Действия

  1. Создайте дашборд в Kibana
    • Добавьте Data View на индекс llm-logs-*.
    • Виджеты:
      • Latency distribution (гистограмма)
      • Top models by request count (data table)
      • Requests over time (line chart, разбивка по model)
      • Slow requests filter (поиск за latency > 5 с цветовой маркировкой).
  2. Настройте простой алерт например, если средняя latency за последние 5 минут > 4 секунд, отправить уведомление в Telegram/Slack (можно через webhook). Используйте Kibana Alerting (требуется лицензия basic) или ELK Stack с ElastAlert.
  3. Проверьте откройте дашборд, выполните ручной поиск user_id: u42 — результат должен появиться мгновенно.

Ожидаемый результат этапа Рабочий дашборд Kibana с фильтрацией по полям и алерт (хотя бы базовый логгер предупреждений).

Этап 5: Документирование и воспроизводимость (оценка времени: 0.5 часа)

Действия

  1. Напишите README проекта, включающий:
    • Как развернуть: docker-compose up -d + pip install -r requirements.txt.
    • Примеры structured логов.
    • Команды для нагрузочного теста (например, python load_test.py --count 1000 --concurrency 10).
    • Скриншоты дашборда.
  2. Упакуйте код в репозиторий (Git), убедитесь, что docker-compose.yml, requirements.txt, конфиги структурированы.
  3. Создайте Makefile или justfile для типовых команд: make up, make test, make logs.

Ожидаемый результат этапа README с инструкциями, репозиторий готов к передаче.

5. Критерии приемки (Definition of Done)

  • Сервис логирует каждый LLM-запрос в формате JSON c полями trace_id, user_id, model, latency, timestamp, status_code.
  • Логи асинхронно отправляются в Elasticsearch (или аналог) без блокировки основного потока.
  • Поиск по любому из полей через Elasticsearch API (term/range) возвращает результат за < 2 секунд при 1M записей.
  • Дашборд Kibana позволяет фильтровать по user_id, model, диапазону latency — обновление данных не дольше 5 секунд после записи.
  • Настроен алерт на аномально высокую latency (>95 перцентиль).
  • Репозиторий содержит полную документацию по развёртыванию в 1 команду (docker-compose up).
  • Код покрыт хотя бы юнит-тестами на формат лога (проверка JSON schema).
  • Нагрузочное тестирование (1000 запросов) не приводит к потере логов (count в ES равен числу отправленных).

6. Ожидаемый результат

Основной артефакт

  • Папка проекта с файлами:
    • app/main.pyFastAPI-сервис
    • app/logging_setup.py — настройка structlog + Elasticsearch handler
    • docker-compose.yml — ES + Kibana
    • load_test.py — скрипт генерации нагрузки
    • test_logging.py — юнит-тесты
    • README.md — документация

Дополнительно

  • Дашборд Kibana экспортированный в NDJSON (для импорта).
  • Конфигурация алерта (например, elastalert_rule.yaml).
  • Отчёт о замерах latency поиска (таблица).

7. Возможные сложности и их решение

СложностьРешение
Elasticsearch не принимает логи из-за несовместимости маппингаУдалить старый индекс (DELETE llm-logs-*) и пересоздать с правильным mapping (через Kibana Dev Tools).
structlog конфликтует с logging из FastAPIИспользовать structlog как единственный логгер, через configure(processors=[...], wrapper_class=BoundLogger) и передать в uvicorn настройку log_config=None или кастомный dict.
Поиск по latency медленный из-за текстового поляУбедиться, что в mapping latency указан как float (не text). Для диапазонов использовать range запрос.
Потеря логов при высокой нагрузкеУвеличить размер буфера (до 500 записей) и использовать пакетную отправку с интервалом 1 секунда. Включить retry с exponential backoff.
Медленный поиск при больших объёмах (>10M)Создать индекс с сортировкой по @timestamp (при старте). Добавить index.routing.allocation.total_shards_per_node и увеличить число шардов до 3–5. Рассмотреть использование time-based индексов.
Нет возможности использовать ElasticsearchЗаменить на ClickHouse (движок MergeTree с ORDER BY (timestamp, user_id)) — быстрый поиск по ключевым полям.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Интеграция structured logging в LLM-сервис1.5 часа
Этап 2: Настройка Elasticsearch и передача логов1 час
Этап 3: Оптимизация поиска и измерение latency1.5 часа
Этап 4: Настройка визуализации и алертинга1 час
Этап 5: Документирование и воспроизводимость0.5 часа
Итого5.5 часов

Примечание При первом выполнении задачи возможны дополнительные 1–2 часа на изучение structlog и Elasticsearch. Если используется заглушка (симуляция) — время уменьшается на 0.5–1 час.

9. Связанные вопросы из базы знаний

ВопросТема
3Что такое structured logging и чем он отличается от unstructured?
7Как передать контекст (trace_id) между микросервисами?
15Какие поля обязательны в логах для observability LLM-систем?
22Как организовать асинхронную отправку логов в Elasticsearch?
28Как измерять latency LLM-вызова (включая время сети)?
31Какие индексы и маппинги оптимальны для поиска по keyword-полям?
42Как настроить дашборд в Kibana для мониторинга LLM сервиса?
55Как реализовать алерт на основе перцентилей latency?
89Как проводить нагрузочное тестирование LLM-endpoint?
101Какие метрики качества retrieval могут быть продублированы в логах?

10. Чек-лист самопроверки

  • Я проверил, что логи на этапе 1 выводятся в stdout в формате JSON с нужными полями.
  • Я убедился, что после отправки 100 запросов все логи попали в ES (count совпадает).
  • Я выполнил замеры поиска по каждому из полей и получил значения < 200 мс (до оптимизации) и < 2 с (после оптимизации на 1M записей).
  • Я создал дашборд в Kibana и проверил, что фильтры по user_id, model, latency работают мгновенно.
  • Я написал хотя бы один unit-тест, который проверяет структуру лога (наличие trace_id, корректный тип latency).
  • Я задокументировал процесс запуска в README так, чтобы его мог воспроизвести другой инженер.