Настроить drift detection для агента

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить drift detection для агента

1. Цель задачи

Реализовать систему детекции дрифта ответов LLM-агента на основе еженедельного сравнения распределений. Система должна автоматически вычислять метрики дрифта по категориям ответов (success, refusal, clarification, error) и выдавать алерт, если относительное изменение доли любой из категорий превышает 30% по сравнению с предыдущей неделей. Закрепить навыки работы с логами, статистическими тестами и настройкой мониторинга.

Ключевой результат Рабочий дрифт-детектор, который раз в неделю обрабатывает логи агента, строит распределение ответов, сравнивает с предыдущим периодом и отправляет алерт (в Telegram/email) при превышении порога 30% для любой категории.

2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Логи ответов агента за минимум 3 неделиСимулировать с помощью Python-скрипта (генерация синтетических данных)
Эталонное распределение за первую неделю (baseline)Из логов первой недели
Классификатор категорий ответов (success, refusal, clarification, error)Предобученный zero-shot классификатор или rule-based
Инструмент для отправки алертовTelegram Bot API / SMTP / Slack Webhook
Хранилище для метрик (например, CSV или PostgreSQL)Локальный SQLite / CSV

Если нет реального агента — симулируем:

  1. Напишите скрипт generate_logs.py, который генерирует 500–1000 записей в день в формате JSON:
    {"timestamp": "2025-03-10T12:00:00Z", "agent_response": "Конечно, вот информация...", "category": "success"}
    
  2. Для первых двух недель задайте стабильное распределение (например, success=80%, refusal=10%, clarification=8%, error=2%).
  3. Для третьей недели измените распределение: success=50%, refusal=30%, clarification=15%, error=5% (чтобы вызвать алерт).
  4. Сохраните данные в папке logs/week_{1,2,3}/responses.json.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Весь пайплайн детекции
Обработка данныхPandas, NumPyЧтение логов, агрегация, статистика
Классификация категорийtransformers (zero-shot-classification) или rule-based по ключевым словамОпределение категории ответа
Статистические тестыSciPy (chi-square, PSI)Сравнение распределений
Хранение метрикSQLite (через sqlite3 или Pandas)Сохранение недельных распределений
Отправка алертовpython-telegram-bot / requests (Slack Webhook)Уведомления
Оркестрацияcron / schedule (Python-библиотека)Еженедельный запуск
Визуализация (опционально)Matplotlib / PlotlyПостроение графиков дрифта

4. Этапы выполнения

Этап 1: Генерация логов и создание baseline (оценка 30 минут)

Действия

  1. Создайте структуру папок:

    drift_detection/
    ├── logs/
    │   ├── week_1/
    │   │   └── responses.json
    │   ├── week_2/
    │   │   └── responses.json
    │   └── week_3/
    │       └── responses.json
    ├── drift_detector.py
    ├── baseline.json
    └── metrics.db
    
  2. В generate_logs.py реализуйте функцию generate_week(week_num, probs), где probs — словарь вероятностей категорий. Пример:

    import json, random
    from datetime import datetime, timedelta
    
    def generate_week(week_num, probs, n_days=7, records_per_day=800):
        categories = list(probs.keys())
        data = []
        base_date = datetime(2025, 3, 3) + timedelta(weeks=week_num-1)
        for day in range(n_days):
            date = base_date + timedelta(days=day)
            for _ in range(records_per_day):
                category = random.choices(categories, weights=probs.values(), k=1)[0]
                data.append({
                    "timestamp": date.strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "agent_response": f"Sample response for {category}",
                    "category": category
                })
        return data
    
  3. Сгенерируйте три недели данных:

    • week_1: probs = {"success":0.80, "refusal":0.10, "clarification":0.08, "error":0.02}
    • week_2: то же распределение (стабильно)
    • week_3: probs = {"success":0.50, "refusal":0.30, "clarification":0.15, "error":0.05}
  4. Сохраните распределение первой недели как baseline.json (словарь с количеством ответов по категориям и нормализованными долями).

Ожидаемый результат этапа Папка logs с тремя неделями логов, файл baseline.json.

Этап 2: Пайплайн классификации и агрегации ответов (оценка 1 час)

Действия

  1. В drift_detector.py напишите функцию load_week(week_path), которая читает JSON и возвращает Pandas DataFrame с колонками timestamp, category.

  2. Реализуйте classify_response(text) — пока просто возвращаем category из лога (т.к. у нас он уже есть). Для усложнения можно добавить zero-shot классификатор:

    from transformers import pipeline
    classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
    candidate_labels = ["success", "refusal", "clarification", "error"]
    def classify_response(text):
        result = classifier(text, candidate_labels)
        return result['labels'][0]
    
  3. Напишите функцию compute_distribution(df) — группировка по category, подсчёт количества и доли:

    def compute_distribution(df):
        dist = df['category'].value_counts(normalize=True)
        dist_abs = df['category'].value_counts()
        return dist.to_dict(), dist_abs.to_dict()
    
  4. Реализуйте функцию save_distribution(week_name, dist, db_path), которая сохраняет распределение в SQLite:

    import sqlite3
    def save_distribution(week_name, dist, db_path):
        conn = sqlite3.connect(db_path)
        cur = conn.cursor()
        # таблица: week_name TEXT, category TEXT, count INTEGER, proportion REAL
        ...
    
  5. Проверьте на всех трёх неделях, что распределения корректно вычисляются и сохраняются.

Ожидаемый результат этапа SQLite база metrics.db с записями для трёх недель.

Этап 3: Сравнение распределений и детекция дрифта (оценка 1 час)

Действия

  1. Реализуйте функцию compare_distributions(dist_prev, dist_curr, threshold=0.30):

    • Для каждой категории вычислить относительное изменение доли:
      change = (dist_curr[cat] - dist_prev[cat]) / dist_prev[cat] if dist_prev[cat] != 0 else None
      
    • Если |change| > threshold — записать как дрифт.
    • Дополнительно вычислить Population Stability Index (PSI):
      import numpy as np
      def psi(expected, actual):
          psi_value = 0
          for cat in set(list(expected.keys()) + list(actual.keys())):
              exp = expected.get(cat, 1e-10)
              act = actual.get(cat, 1e-10)
              psi_value += (act - exp) * np.log(act / exp)
          return psi_value
      
    • Порог для PSI: >0.2 — значительный сдвиг.
  2. Для проверки статистической значимости используйте chi-square test:

    from scipy.stats import chi2_contingency
    # таблица сопряжённости: строки = недели, столбцы = категории
    table = [[dist_prev_abs['success'], dist_prev_abs['refusal'], ...],
             [dist_curr_abs['success'], dist_curr_abs['refusal'], ...]]
    chi2, p, dof, expected = chi2_contingency(table)
    # если p < 0.05 — распределения значимо различны
    
  3. Условие алерта: PSI > 0.2 ИЛИ (chi2 p < 0.05 И хотя бы одна категория изменилась >30%).

  4. Напишите функцию alert_if_drift(...), которая при выполнении условия отправляет уведомление.

Ожидаемый результат этапа Функция детекции, которая для week_2 выдаёт "нет дрифта", а для week_3 — алерт.

Этап 4: Отправка алерта и оркестрация (оценка 45 минут)

Действия

  1. Настройте Telegram-бота:

    • Создайте бота через @BotFather, получите токен.
    • Установите python-telegram-bot (v20+).
    • Напишите функцию send_telegram_alert(message):
      import asyncio
      from telegram import Bot
      async def send_alert(msg):
          bot = Bot(token="YOUR_TOKEN")
          await bot.send_message(chat_id="YOUR_CHAT_ID", text=msg)
      asyncio.run(send_alert("Дрифт обнаружен!"))
      
  2. Вставьте вызов алерта в основной пайплайн.

  3. Оберните весь пайплайн в функцию run_weekly_check(week_path), которая:

    • загружает текущую неделю,
    • загружает baseline из SQLite (последняя запись),
    • сравнивает,
    • при дрифте отправляет алерт,
    • сохраняет распределение текущей недели.
  4. Настройте автоматический запуск каждую неделю через cron (или используйте schedule для теста каждые 10 минут):

    0 10 * * 1 cd /path/to/project && python drift_detector.py --week_path logs/week_$(date +%V)
    
  5. Протестируйте на симулированных данных: запустите вручную для каждой недели.

Ожидаемый результат этапа При обработке week_3 получено сообщение в Telegram вида:
"⚠️ Drift detected! Week 3 vs Week 2. PSI=0.45. Categories with >30% change: refusal (+200%), clarification (+87.5%), error (+150%)."

Этап 5: Визуализация и дашборд (опционально, оценка 30 минут)

Действия

  1. Постройте график изменения долей категорий по неделям с помощью Matplotlib:

    import matplotlib.pyplot as plt
    df_metrics = pd.read_sql("SELECT * FROM distributions", conn)
    pivot = df_metrics.pivot(index='week_name', columns='category', values='proportion')
    pivot.plot(kind='bar', stacked=True)
    plt.savefig('drift_trend.png')
    
  2. Обновите алерт, чтобы он содержал ссылку на график (загрузите на Imgur или прикрепите файл).

Ожидаемый результат этапа Файл drift_trend.png с визуализацией дрифта.

5. Критерии приемки (Definition of Done)

  • Генератор логов создаёт данные с контролируемой вариацией распределений.
  • Пайплайн читает логи из указанной папки, классифицирует (или использует готовую категорию).
  • Рассчитываются относительные изменения по каждой категории и PSI / chi-square.
  • Для week_2 (стабильной) алерт не срабатывает.
  • Для week_3 (с дрифтом >30%) алерт срабатывает и содержит все детали.
  • Алерт отправляется в Telegram (или другой канал).
  • Все распределения сохраняются в SQLite с указанием номера недели.
  • Код покрыт хотя бы базовыми unit-тестами (pytest) для функций сравнения.
  • README с инструкцией по запуску (один pip install -r requirements.txt и python drift_detector.py --week week_3).
  • График дрифта строится (если реализован этап 5).

6. Ожидаемый результат

Основной артефакт — репозиторий проекта drift_detection/ со следующей структурой:

drift_detection/
├── README.md
├── requirements.txt                 # pandas, scipy, numpy, python-telegram-bot, transformers (опц.)
├── generate_logs.py                 # скрипт генерации синтетических логов
├── drift_detector.py                # основной пайплайн
├── tests/
│   ├── test_generation.py           # юнит-тесты для генерации
│   ├── test_drift_detection.py      # тесты сравнения распределений
├── logs/                            # сгенерированные данные
├── metrics.db                       # SQLite база с распределениями
├── drift_trend.png                  # график (если делали)
└── baseline.json                    # эталонная неделя (дублирование)

Содержание drift_detector.py: функции load_week, compute_distribution, compare_distributions, send_alert, run_weekly_check.
Точка входа — run_weekly_check(week_path), вызывает все этапы.

Дополнительные результаты Документация по настройке Telegram-бота, скрипт для cron, описание метрик.

7. Возможные сложности и их решение

СложностьРешение
Нет реального агента для логовИспользовать симуляцию generate_logs.py с контролируемым дрифтом
Токен Telegram-бота или chat_id неизвестенИспользовать заглушку: печать алерта в консоль; либо настроить Slack Webhook с публичным URL
Не все категории присутствуют в обеих неделях (divide by zero)Добавить сглаживание (Laplace smoothing): proportion = (count + 1) / (total + num_categories)
Статистические тесты могут быть нечувствительны при малом размере выборкиУвеличить число записей в симуляции до 1000 в день; использовать только PSI
Забыли запустить скрипт вовремяНастроить cron; добавить логирование запуска в файл run.log
Модель zero-shot классификации медленнаяЗакэшировать результаты; или использовать rule-based классификацию (ключевые слова: "извините" → refusal, "уточните" → clarification)
Порог 30% может давать ложные срабатывания на малых категориях (error)Ввести минимальный порог абсолютной доли (например, категория должна быть >1% от общего числа)

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Генерация логов и baseline30 мин
Этап 2: Пайплайн классификации и агрегации1 час
Этап 3: Сравнение распределений и детекция дрифта1 час
Этап 4: Отправка алерта и оркестрация45 мин
Этап 5: Визуализация (опционально)30 мин
Итого (с этапом 5)3 часа 45 минут

Примечание Для первого выполнения с детальным изучением документации и отладкой заложите +1 час. При наличии опыта с Pandas и SciPy можно уложиться в 2.5 часа.

9. Связанные вопросы из базы знаний

ВопросТема
43Как настроить мониторинг LLM-приложений
78Population Stability Index (PSI) для ML
145Интеграция Telegram Bot для уведомлений
211Chi-square test в SciPy
302Zero-shot классификация с Hugging Face
404Логирование и экспорт метрик в SQLite
567Автоматизация Python-скриптов через cron
639Сглаживание вероятностей (Laplace)
712Визуализация распределений с Matplotlib
891A/B тестирование для LLM-ответов

10. Чек-лист самопроверки

  • Я сгенерировал три недели синтетических логов с явно разными распределениями.
  • Пайплайн корректно считывает логи и вычисляет абсолютные и относительные доли.
  • Код сравнения правильно детектирует стабильную неделю (без алерта) и дрифтовую (с алертом).
  • Алерт содержит название недели, PSI, список категорий с превышением порога 30%.
  • Я протестировал запуск из командной строки с параметром --week для каждой недели.
  • В README описаны шаги установки зависимостей и запуска.
  • В проекте есть тесты хотя бы для ключевых функций (compute_distribution, compare_distributions).