Реализовать write-through cache для RAG
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать write-through cache для RAG
1. Цель задачи
Разработать механизм write-through cache для RAG-системы, при котором любое изменение документа в источнике (добавление, обновление, удаление) немедленно приводит к обновлению соответствующей записи в кэше эмбеддингов или других кэшируемых данных. Это устраняет проблему stale data, обеспечивая консистентность между источником и кэшем без дополнительных задержек при запросах.
Ключевой результат Рабочий модуль, который при обновлении документа в primary storage синхронно пересчитывает и записывает новое значение в кэш, а также инвалидирует старые записи.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| RAG-система (минимальная) | Пет-проект или написать с нуля: простой retriever + embedding модель |
| Источник документов | SQLite / PostgreSQL (таблица documents) или файлы в папке |
| Сервер кэша | Redis (локально через Docker) или in-memory dict (для симуляции) |
| Средство обнаружения изменений | CDC-триггер / webhook / polling с флагом updated_at |
| Эмбеддинг-модель | sentence-transformers/all-MiniLM-L6-v2 (бесплатно, Hugging Face) |
| Инструменты для тестов | pytest, fake data (Faker) |
Если нет реального инструмента — симулируем:
- Установите Redis через Docker: docker run -d -p 6379:6379 redis:7
- Либо используйте Python
dictкак in-memory cache (для отладки без Redis) - Создайте имитацию документа: JSON с полями id, title, content, updated_at
- Смоделируйте событие обновления: функция
update_document(id, new_content), которая будет вызываться вручную
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык и среда | Python 3.10+, virtualenv | Разработка и тестирование |
| Источник данных | SQLite (встроенная библиотека sqlite3) | Хранение документов |
| Кэш (основной) | Redis (через redis-py) | Кэширование эмбеддингов / результатов retrieval |
| Эмбеддинги | sentence-transformers, torch | Генерация векторов при записи и чтении |
| Модель данных | Pydantic / dataclass | Структура документа |
| Тестирование | pytest, unittest.mock | Юнит-тесты и интеграционные тесты |
| Логирование | logging (stdout + file) | Отладка и мониторинг операций кэша |
4. Этапы выполнения
Этап 1: Подготовка окружения и минимальной RAG-системы (30 мин)
Действия
- Создайте виртуальное окружение и установите зависимости:
python -m venv venv && source venv/bin/activate pip install redis sentence-transformers sqlite3 pytest pydantic - Инициализируйте SQLite-базу с таблицей документов:
import sqlite3 conn = sqlite3.connect('rag.db') conn.execute('''CREATE TABLE IF NOT EXISTS documents (id INTEGER PRIMARY KEY, title TEXT, content TEXT, updated_at TEXT)''') conn.close() - Напишите класс
DocumentCacheс методами get(key) и set(key, value) (Redis-клиент). - Реализуйте простой retriever: принимает запрос, эмбеддит, ищет в кэше (на этом этапе — заглушка).
Ожидаемый результат этапа Рабочая база данных с тестовыми документами, Redis-клиент, возможность кэшировать и получать значение.
Этап 2: Реализация write-through логики (1.5 часа)
Действия
- Спроектируйте интерфейс
CacheBackend, который может быть Redis или in-memory dict. - Реализуйте класс
WriteThroughCache:- При вызове put(key, value) — сначала записать в primary storage (SQLite), затем пересчитать эмбеддинг и записать в кэш.
- При вызове delete(key) — удалить из primary storage и из кэша.
- Метод get(key) — читать из кэша (если промах — достать из БД, сгенерировать эмбеддинг, записать в кэш и вернуть).
- Добавьте функцию
update_document_batch(docs), которая атомарно применяет write-through к списку изменений. - Обработка коллизий: если при записи в кэш возникла ошибка — не прерывать запись в БД, но залогировать и установить флаг
needs_sync.
Ключевой код (пример):
class WriteThroughCache:
def __init__(self, primary_storage, cache_backend, embedder):
self.primary = primary_storage
self.cache = cache_backend
self.embedder = embedder
def put(self, doc_id, doc_data):
# 1. Пишем в primary storage
self.primary.upsert(doc_id, doc_data)
# 2. Генерируем эмбеддинг (если нужно кэшировать эмбеддинг)
embedding = self.embedder.encode(doc_data['content'])
# 3. Пишем в кэш
self.cache.set(f"doc:{doc_id}", embedding)
# 4. Логируем
logging.info(f"Write-through: document {doc_id} updated in cache")
Ожидаемый результат этапа Модуль, который синхронно обновляет и primary storage, и кэш.
Этап 3: Интеграция с событиями обновления документов (1 час)
Действия
- Создайте эмулятор внешних изменений: прослушивайте очередь Redis (Pub/Sub) на канале
doc_updatesили polling по полюupdated_at. - Напишите скрипт
simulate_update.py, который случайным образом выбирает документ, изменяет его содержимое и вызывает WriteThroughCache.put(). - Реализуйте fallback: если кэш недоступен, система должна читать из primary storage (возможно, с TTL-инвалидацией по таймеру).
- Проверьте, что после вызова put старые эмбеддинги больше не возвращаются: тест вызывает
getдо и после изменения и сравнивает векторы — они должны различаться.
def test_no_stale_data():
doc_id = 1
original_content = "Hello world"
updated_content = "Hello cache system"
cache.put(doc_id, {'content': original_content})
old_embedding = cache.get(doc_id)
cache.put(doc_id, {'content': updated_content})
new_embedding = cache.get(doc_id)
assert old_embedding != new_embedding, "Stale data detected!"
Ожидаемый результат этапа Автоматизированный скрипт, который при изменении документа в источнике обновляет кэш, и тест подтверждает отсутствие stale data.
Этап 4: Мониторинг, логирование и обработка ошибок (30 мин)
Действия
- Добавьте метрики: количество write-through операций, количество сбоев записи в кэш, hit/miss ratio.
- Настройте логирование с уровнем INFO для обычных операций и WARNING/ERROR для ошибок кэша.
- Реализуйте повтор при ошибке кэша с экспоненциальной задержкой (retry 3 раза).
- Напишите скрипт стресс-теста: 100 одновременных обновлений, проверьте, что кэш не рассинхронизирован.
Ожидаемый результат этапа Логи и метрики, доступные для анализа; обработка ошибок не приводит к потере данных.
Этап 5: Документирование и финальное тестирование (30 мин)
Действия
- Напишите README с описанием архитектуры, методов класса
WriteThroughCache, примерами запуска. - Добавьте docstrings ко всем публичным методам.
- Запустите полный набор тестов:
pytest test/ -v. - Убедитесь, что код покрывает все граничные случаи: пустой content, дубликаты, массовые обновления.
Ожидаемый результат этапа Пакет с кодом, документацией и тестами готов к использованию.
5. Критерии приемки (Definition of Done)
- Реализован класс
WriteThroughCacheс методамиput,get,delete. - При вызове
putдокумент сначала записывается в primary storage, затем в кэш. - При вызове
getвозвращается актуальное значение; после изменения документа старые данные не возвращаются. - Обработаны ошибки кэша: при недоступности Redis система корректно переключается на чтение из primary storage.
- Написаны не менее 3 интеграционных тестов (один на stale data, один на ошибку кэша, один на массовое обновление).
- Код покрыт логами с уровнями INFO и ERROR.
- Создан README с инструкцией по запуску и примерами.
- Все тесты проходят успешно (pytest exit code 0).
6. Ожидаемый результат
- Файл/артефакт Python-пакет
write_through_cacheс модулями: - Содержание Полностью рабочий механизм write-through кэширования для RAG.
- Дополнительные результаты
- Лог-файл с примерами операций.
- Метрики hit/miss (если есть Grafana/Prometheus — дашборд, но не обязательно).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Race condition при параллельных записях и чтениях | Использовать блокировку (Redis Lock / threading.Lock) на ключ документа. |
| Отказ Redis -> потеря кэша | Реализовать fallback на primary storage; при восстановлении Redis можно прогреть кэш. |
| Высокая latency при пересчёте эмбеддинга (большие документы) | Кэшировать только эмбеддинги, а не сырые данные; вычислять асинхронно (background task) с подтверждением записи в primary. |
| Инвалидация связанных кэшей (например, full-text search) | Добавить listener на события и при изменении одного документа инвалидировать все связанные ключи. |
| Ошибка сети при записи в кэш после успешной записи в БД | Логировать и отмечать запись как dirty для последующей репликации. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка окружения | 30 мин |
| Этап 2: Реализация write-through логики | 1 час 30 мин |
| Этап 3: Интеграция с событиями | 1 час |
| Этап 4: Мониторинг и обработка ошибок | 30 мин |
| Этап 5: Документирование и тестирование | 30 мин |
| Итого | 4 часа |
Примечание Для первого раза может потребоваться до 6 часов из-за отладки Redis и тестирования граничных случаев.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Основы кэширования (cache hit/miss, TTL) |
| 45 | Инвалидация кэша при обновлении данных |
| 102 | RAG pipeline: embedding, retrieval, generation |
| 201 | Write-through vs write-back стратегии |
| 304 | Консистентность данных в распределённых системах |
| 410 | Обработка ошибок в production (retry, fallback) |
| 518 | Использование Redis в AI-системах |
| 607 | Тестирование кэш-стратегий (unit, integration) |
| 720 | Мониторинг и метрики для кэшей |
| 888 | Batch update и atomic операции |
10. Чек-лист самопроверки
- Я написал и запустил тесты, и они не показывают stale data после обновления документа.
- Я проверил сценарий, когда кэш-сервер (Redis) временно недоступен — система не падает, а читает из primary storage.
- Я добавил логирование ключевых операций (put, get, delete) с разными уровнями.
- Я написал README, который может прочитать другой разработчик и сразу запустить систему без дополнительных вопросов.
- Я протестировал массовое обновление (50+ документов) и убедился, что кэш остаётся консистентным.