English translation is not available yet. Showing Russian content.
Реализовать query drift детекцию
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать query drift детекцию
1. Цель задачи
Разработать систему детекции изменения распределения retrieval результатов (query drift) в RAG-пайплайне. Система должна отслеживать распределение эмбеддингов запросов или метрик retrieval (например, hit rate, MRR) и сигнализировать при значительном (>20%) отклонении от исторического baseline.
Ключевой результат Рабочий компонент, который каждые N минут (настраиваемо) вычисляет статистическую меру расхождения (KL divergence / Wasserstein distance) между текущим окном запросов и референтным распределением, и отправляет алерт при превышении порога 20%.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| RAG-система с логированием запросов | Рабочий или тестовый пет-проект (например, из задачи Pet 221 или собственный) |
| Логи запросов и retrieval результатов | ClickHouse, Loki, или CSV-файл с колонками: timestamp, query_text, embedding (list of floats), retrieved_doc_ids, hit (0/1) |
| Базовое (референтное) распределение эмбеддингов | Собрать за период 1-4 недели до начала работы детекции (задать как baseline) |
| Инструмент для алертинга | Prometheus Alertmanager, Telegram bot, или stdout в CI |
Если нет реального лога — симулируем:
- Сгенерировать 2 набора синтетических запросов: «нормальные» (например, вопросы по документации продукта) и «дрифтовые» (вопросы на не связанную тему, например, рецепты или спорт)
- Для каждого набора получить эмбеддинги через sentence-transformers (например, all-MiniLM-L6-v2)
- Сохранить в CSV с указанием метки
is_drift(0/1) - Использовать нормальные запросы как baseline, а смесь 80% нормальных + 20% дрифтовых как тестовое окно
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.10+ | Всё кроме инфраструктуры |
| Векторизация текста | sentence-transformers (all-MiniLM-L6-v2 / intfloat/e5-small-v2) | Получение эмбеддингов запросов |
| Обработка данных | pandas, numpy, scipy | Статистические тесты, KL divergence |
| Хранилище распределений | Pickle / Parquet / Redis (опционально) | Сохранение baseline и окон |
| Алертинг | prometheus_client + Alertmanager / простой TelegramBot / logging + send_email | Уведомление |
| CI/тестирование | pytest | Автотесты на синтетических данных |
| Оркестрация | cron / schedule / Airflow (опционально) | Периодический запуск детекции |
4. Этапы выполнения
Этап 1: Подготовка данных и baseline (1 час)
Действия
- Собрать исторические запросы (минимум 1000) или сгенерировать синтетические «нормальные» запросы.
- Вычислить эмбеддинги для каждого запроса (768-мерный вектор).
- Вычислить средний эмбеддинг (центроид) и ковариационную матрицу распределения.
- Сохранить baseline как baseline.pkl:
baseline = { 'centroid': np.mean(embeddings, axis=0), 'cov': np.cov(embeddings, rowvar=False), 'n_samples': len(embeddings), 'timestamp': datetime.now() } - Подготовить тестовое окно (например, последние 100 запросов). В симуляции — окно из 80% нормы + 20% дрифта.
Ожидаемый результат этапа Файл baseline.npz / baseline.pkl и скрипт prepare_baseline.py, который можно перезапускать.
Этап 2: Реализация детектора (2 часа)
Действия
- Реализовать функцию
compute_drift(reference_embeddings, window_embeddings) -> float:- Вариант A KL divergence между двумя многомерными гауссианами (оценить mean и cov окон, затем KL).
- Вариант B Wasserstein distance (Earth Mover’s Distance) — более робастная, но медленнее.
- Вариант C Отклонение центроида (cosine distance от baseline центроида) — быстро, но менее чувствительно к форме.
- Выбрать вариант A как основной, B как запасной.
- Реализовать пороговое правило:
Порог 0.2 интерпретируется как относительное изменение (если KL/Wasserstein нормированы на baseline variance).def is_drift(drift_score, threshold=0.2): return drift_score > threshold - Написать класс
QueryDriftDetector:class QueryDriftDetector: def __init__(self, baseline_path: str, threshold: float = 0.2): ... def add_query(self, query: str): # вычислить эмбеддинг и добавить в текущее окно def check_drift(self) -> tuple[bool, float]: # сравнить окно с baseline
Ожидаемый результат этапа Скрипт detector.py с детектором и функцией compute_drift.
Этап 3: Интеграция алертинга (1.5 часа)
Действия
- Выбрать способ алерта:
- Telegram создать бота (через BotFather), написать функцию send_telegram_alert(message, token, chat_id)
- Prometheus + Alertmanager экспортировать метрику
query_drift_score(Gauge) и порог через prometheus_client - Простой лог logging.warning(f"Drift detected: score={score:.3f}, threshold={threshold}") + e-mail (опционально)
- Реализовать обёртку alert_if_drift(drift_score, threshold, alert_backend).
- Написать скрипт
run_detection.py, который: - Протестировать на симуляции: убедиться, что при дрифте >20% алерт срабатывает, при <10% — молчит (настроить ROC).
Ожидаемый результат этапа Рабочий алерт при симуляции с >20% дрифта.
Этап 4: Тестирование и валидация (1 час)
Действия
- Написать pytest тесты:
- Проанализировать ROC-кривую на синтетических данных (подобрать оптимальный threshold, если 0.2 неоптимален).
- Документировать метрики качества (precision, recall при пороге 0.2).
Ожидаемый результат этапа pytest зелёный, отчёт о метриках в README.
Этап 5: Документация и деплой (0.5 часа)
Действия
- Написать
README.mdс:- Описанием архитектуры
- Инструкцией по запуску (pip install, запуск детектора)
- Параметрами конфигурации
- Примером алерта
- Добавить
requirements.txtиsetup.py(илиpyproject.toml). - (Опционально) Docker-образ для запуска как микросервис.
Ожидаемый результат этапа Репозиторий с кодом, готовый к интеграции в CI/CD.
5. Критерии приемки (Definition of Done)
- Детектор реализован (скрипт
detector.pyс классомQueryDriftDetector) - Функция
compute_driftвычисляет KL divergence между baseline и окном - Baseline сохраняется и загружается (формат pickle/parquet)
- Алерт (Telegram / Prometheus / лог) срабатывает при drift_score > threshold (0.2)
- Пройдены unit-тесты (pytest):
- no_drift: score < 0.05
- drift_30%: score > 0.2
- пустое окно обрабатывается без падения
- Порог алерта настраивается (аргумент конструктора или переменные окружения)
- README с инструкцией по запуску и примером
- Весь код залит в git-репозиторий с коммитами
6. Ожидаемый результат
Основной артефакт Папка проекта query-drift-detector/ с:
detector.py— реализация детектораprepare_baseline.py— скрипт подготовки baselinerun_detection.py— скрипт для периодического запускаtests/— тестыREADME.md— документацияrequirements.txt
Содержимое Детектор может быть использован как модуль или запущен как сервис. При появлении дрифта генерируется алерт. В симуляции показывает precision > 0.9 при пороге 0.2.
Опционально Grafana-дашборд с метрикой query_drift_score, если используется Prometheus.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Многомерные распределения (768d) — KL divergence может быть неустойчивой | Использовать Wasserstein distance или PCA-редукцию до 50-100 компонент перед вычислением KL |
| Baseline устаревает (через месяц распределение законно изменилось) | Implement экспоненциальное скользящее среднее (EWMA) для baseline; обновлять раз в день |
| Вычислительная нагрузка (эмбеддинги для каждого запроса) | Кэшировать эмбеддинги, использовать batch-обработку, асинхронные вызовы |
| Threshold 0.2 не универсален | Добавить автокалибровку: собрать данные за 2 недели без дрифта, вычислить 99-й перцентиль как порог |
| Дрифт может быть медленным (а не резким) | Добавить детекцию тренда (скользящее среднее drift_score > порога 3 периода подряд) |
| Нет реальных логов | Генерация синтетики (см. Этап 1) с контролируемым дрифтом |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка данных и baseline | 1 час |
| Этап 2: Реализация детектора | 2 часа |
| Этап 3: Интеграция алертинга | 1.5 часа |
| Этап 4: Тестирование и валидация | 1 час |
| Этап 5: Документация и деплой | 0.5 часа |
| Итого | 6 часов |
Примечание Для неопытного инженера время может увеличиться до 10-12 часов из-за необходимости освоения библиотек (sentence-transformers, scipy).
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Построение baseline для метрик retrieval |
| 34 | KL divergence и Wasserstein distance |
| 47 | (Текущая задача) |
| 89 | Экспорт метрик в Prometheus |
| 105 | Разработка алертов на основе перцентилей |
| 201 | PCA для уменьшения размерности эмбеддингов |
| 312 | Логирование запросов RAG |
| 405 | ROC-анализ для подбора порога детекции |
| 518 | Автоматическое обновление baseline (EWMA) |
| 623 | Симуляция data drift для тестирования |
10. Чек-лист самопроверки
- Я реализовал класс
QueryDriftDetectorс методамиadd_queryиcheck_drift. - Я протестировал детектор на синтетических данных: при дрифте >20% получаю score >0.2.
- Алерт приходит на выбранный канал (Telegram / лог / Prometheus).
- Порог алерта легко меняется через параметр конструктора.
- Код покрыт pytest-тестами (минимум 4 теста).
- README содержит инструкцию по запуску и пример вызова.
- Все зависимости зафиксированы в
requirements.txt. - Репозиторий чистый, без лишних файлов (включая
.gitignore).