Реализовать query drift детекцию

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать query drift детекцию

1. Цель задачи

Разработать систему детекции изменения распределения retrieval результатов (query drift) в RAG-пайплайне. Система должна отслеживать распределение эмбеддингов запросов или метрик retrieval (например, hit rate, MRR) и сигнализировать при значительном (>20%) отклонении от исторического baseline.

Ключевой результат Рабочий компонент, который каждые N минут (настраиваемо) вычисляет статистическую меру расхождения (KL divergence / Wasserstein distance) между текущим окном запросов и референтным распределением, и отправляет алерт при превышении порога 20%.

2. Исходные данные

Что нужноОткуда взять
RAG-система с логированием запросовРабочий или тестовый пет-проект (например, из задачи Pet 221 или собственный)
Логи запросов и retrieval результатовClickHouse, Loki, или CSV-файл с колонками: timestamp, query_text, embedding (list of floats), retrieved_doc_ids, hit (0/1)
Базовое (референтное) распределение эмбеддинговСобрать за период 1-4 недели до начала работы детекции (задать как baseline)
Инструмент для алертингаPrometheus Alertmanager, Telegram bot, или stdout в CI

Если нет реального лога — симулируем:

  1. Сгенерировать 2 набора синтетических запросов: «нормальные» (например, вопросы по документации продукта) и «дрифтовые» (вопросы на не связанную тему, например, рецепты или спорт)
  2. Для каждого набора получить эмбеддинги через sentence-transformers (например, all-MiniLM-L6-v2)
  3. Сохранить в CSV с указанием метки is_drift (0/1)
  4. Использовать нормальные запросы как baseline, а смесь 80% нормальных + 20% дрифтовых как тестовое окно

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Всё кроме инфраструктуры
Векторизация текстаsentence-transformers (all-MiniLM-L6-v2 / intfloat/e5-small-v2)Получение эмбеддингов запросов
Обработка данныхpandas, numpy, scipyСтатистические тесты, KL divergence
Хранилище распределенийPickle / Parquet / Redis (опционально)Сохранение baseline и окон
Алертингprometheus_client + Alertmanager / простой TelegramBot / logging + send_emailУведомление
CI/тестированиеpytestАвтотесты на синтетических данных
Оркестрацияcron / schedule / Airflow (опционально)Периодический запуск детекции

4. Этапы выполнения

Этап 1: Подготовка данных и baseline (1 час)

Действия

  1. Собрать исторические запросы (минимум 1000) или сгенерировать синтетические «нормальные» запросы.
  2. Вычислить эмбеддинги для каждого запроса (768-мерный вектор).
  3. Вычислить средний эмбеддинг (центроид) и ковариационную матрицу распределения.
  4. Сохранить baseline как baseline.pkl:
    baseline = {
        'centroid': np.mean(embeddings, axis=0),
        'cov': np.cov(embeddings, rowvar=False),
        'n_samples': len(embeddings),
        'timestamp': datetime.now()
    }
    
  5. Подготовить тестовое окно (например, последние 100 запросов). В симуляции — окно из 80% нормы + 20% дрифта.

Ожидаемый результат этапа Файл baseline.npz / baseline.pkl и скрипт prepare_baseline.py, который можно перезапускать.

Этап 2: Реализация детектора (2 часа)

Действия

  1. Реализовать функцию compute_drift(reference_embeddings, window_embeddings) -> float:
    • Вариант A KL divergence между двумя многомерными гауссианами (оценить mean и cov окон, затем KL).
    • Вариант B Wasserstein distance (Earth Mover’s Distance) — более робастная, но медленнее.
    • Вариант C Отклонение центроида (cosine distance от baseline центроида) — быстро, но менее чувствительно к форме.
  2. Выбрать вариант A как основной, B как запасной.
  3. Реализовать пороговое правило:
    def is_drift(drift_score, threshold=0.2):
        return drift_score > threshold
    
    Порог 0.2 интерпретируется как относительное изменение (если KL/Wasserstein нормированы на baseline variance).
  4. Написать класс QueryDriftDetector:
    class QueryDriftDetector:
        def __init__(self, baseline_path: str, threshold: float = 0.2):
            ...
        def add_query(self, query: str):
            # вычислить эмбеддинг и добавить в текущее окно
        def check_drift(self) -> tuple[bool, float]:
            # сравнить окно с baseline
    

Ожидаемый результат этапа Скрипт detector.py с детектором и функцией compute_drift.

Этап 3: Интеграция алертинга (1.5 часа)

Действия

  1. Выбрать способ алерта:
    • Telegram создать бота (через BotFather), написать функцию send_telegram_alert(message, token, chat_id)
    • Prometheus + Alertmanager экспортировать метрику query_drift_score (Gauge) и порог через prometheus_client
    • Простой лог logging.warning(f"Drift detected: score={score:.3f}, threshold={threshold}") + e-mail (опционально)
  2. Реализовать обёртку alert_if_drift(drift_score, threshold, alert_backend).
  3. Написать скрипт run_detection.py, который:
    • Загружает baseline и накапливает окно запросов (из Kafka / CSV / stdin)
    • Каждые N запросов запускает check_drift() и при превышении отправляет алерт
  4. Протестировать на симуляции: убедиться, что при дрифте >20% алерт срабатывает, при <10% — молчит (настроить ROC).

Ожидаемый результат этапа Рабочий алерт при симуляции с >20% дрифта.

Этап 4: Тестирование и валидация (1 час)

Действия

  1. Написать pytest тесты:
    • test_no_drift: два окна нормальных запросов → drift_score ≈ 0 (<0.05)
    • test_drift_detected: baseline vs окно с 30% дрифтов → score > 0.2
    • test_edge_case: пустое окно → exception или score=0
    • test_persist_reload: сохранить baseline, загрузить, убедиться, что центроиды совпадают.
  2. Проанализировать ROC-кривую на синтетических данных (подобрать оптимальный threshold, если 0.2 неоптимален).
  3. Документировать метрики качества (precision, recall при пороге 0.2).

Ожидаемый результат этапа pytest зелёный, отчёт о метриках в README.

Этап 5: Документация и деплой (0.5 часа)

Действия

  1. Написать README.md с:
    • Описанием архитектуры
    • Инструкцией по запуску (pip install, запуск детектора)
    • Параметрами конфигурации
    • Примером алерта
  2. Добавить requirements.txt и setup.py (или pyproject.toml).
  3. (Опционально) Docker-образ для запуска как микросервис.

Ожидаемый результат этапа Репозиторий с кодом, готовый к интеграции в CI/CD.

5. Критерии приемки (Definition of Done)

  • Детектор реализован (скрипт detector.py с классом QueryDriftDetector)
  • Функция compute_drift вычисляет KL divergence между baseline и окном
  • Baseline сохраняется и загружается (формат pickle/parquet)
  • Алерт (Telegram / Prometheus / лог) срабатывает при drift_score > threshold (0.2)
  • Пройдены unit-тесты (pytest):
    • no_drift: score < 0.05
    • drift_30%: score > 0.2
    • пустое окно обрабатывается без падения
  • Порог алерта настраивается (аргумент конструктора или переменные окружения)
  • README с инструкцией по запуску и примером
  • Весь код залит в git-репозиторий с коммитами

6. Ожидаемый результат

Основной артефакт Папка проекта query-drift-detector/ с:

  • detector.py — реализация детектора
  • prepare_baseline.py — скрипт подготовки baseline
  • run_detection.py — скрипт для периодического запуска
  • tests/ — тесты
  • README.md — документация
  • requirements.txt

Содержимое Детектор может быть использован как модуль или запущен как сервис. При появлении дрифта генерируется алерт. В симуляции показывает precision > 0.9 при пороге 0.2.

Опционально Grafana-дашборд с метрикой query_drift_score, если используется Prometheus.

7. Возможные сложности и их решение

СложностьРешение
Многомерные распределения (768d) — KL divergence может быть неустойчивойИспользовать Wasserstein distance или PCA-редукцию до 50-100 компонент перед вычислением KL
Baseline устаревает (через месяц распределение законно изменилось)Implement экспоненциальное скользящее среднее (EWMA) для baseline; обновлять раз в день
Вычислительная нагрузка (эмбеддинги для каждого запроса)Кэшировать эмбеддинги, использовать batch-обработку, асинхронные вызовы
Threshold 0.2 не универсаленДобавить автокалибровку: собрать данные за 2 недели без дрифта, вычислить 99-й перцентиль как порог
Дрифт может быть медленным (а не резким)Добавить детекцию тренда (скользящее среднее drift_score > порога 3 периода подряд)
Нет реальных логовГенерация синтетики (см. Этап 1) с контролируемым дрифтом

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка данных и baseline1 час
Этап 2: Реализация детектора2 часа
Этап 3: Интеграция алертинга1.5 часа
Этап 4: Тестирование и валидация1 час
Этап 5: Документация и деплой0.5 часа
Итого6 часов

Примечание Для неопытного инженера время может увеличиться до 10-12 часов из-за необходимости освоения библиотек (sentence-transformers, scipy).

9. Связанные вопросы из базы знаний

ВопросТема
12Построение baseline для метрик retrieval
34KL divergence и Wasserstein distance
47(Текущая задача)
89Экспорт метрик в Prometheus
105Разработка алертов на основе перцентилей
201PCA для уменьшения размерности эмбеддингов
312Логирование запросов RAG
405ROC-анализ для подбора порога детекции
518Автоматическое обновление baseline (EWMA)
623Симуляция data drift для тестирования

10. Чек-лист самопроверки

  • Я реализовал класс QueryDriftDetector с методами add_query и check_drift.
  • Я протестировал детектор на синтетических данных: при дрифте >20% получаю score >0.2.
  • Алерт приходит на выбранный канал (Telegram / лог / Prometheus).
  • Порог алерта легко меняется через параметр конструктора.
  • Код покрыт pytest-тестами (минимум 4 теста).
  • README содержит инструкцию по запуску и пример вызова.
  • Все зависимости зафиксированы в requirements.txt.
  • Репозиторий чистый, без лишних файлов (включая .gitignore).