Реализовать write-through cache для RAG

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать write-through cache для RAG

1. Цель задачи

Разработать механизм write-through cache для RAG-системы, при котором любое изменение документа в источнике (добавление, обновление, удаление) немедленно приводит к обновлению соответствующей записи в кэше эмбеддингов или других кэшируемых данных. Это устраняет проблему stale data, обеспечивая консистентность между источником и кэшем без дополнительных задержек при запросах.

Ключевой результат Рабочий модуль, который при обновлении документа в primary storage синхронно пересчитывает и записывает новое значение в кэш, а также инвалидирует старые записи.


2. Исходные данные

Что нужноОткуда взять
RAG-система (минимальная)Пет-проект или написать с нуля: простой retriever + embedding модель
Источник документовSQLite / PostgreSQL (таблица documents) или файлы в папке
Сервер кэшаRedis (локально через Docker) или in-memory dict (для симуляции)
Средство обнаружения измененийCDC-триггер / webhook / polling с флагом updated_at
Эмбеддинг-модельsentence-transformers/all-MiniLM-L6-v2 (бесплатно, Hugging Face)
Инструменты для тестовpytest, fake data (Faker)

Если нет реального инструмента — симулируем:

  1. Установите Redis через Docker: docker run -d -p 6379:6379 redis:7
  2. Либо используйте Python dict как in-memory cache (для отладки без Redis)
  3. Создайте имитацию документа: JSON с полями id, title, content, updated_at
  4. Смоделируйте событие обновления: функция update_document(id, new_content), которая будет вызываться вручную

3. Технологический стек

КомпонентИнструментыНазначение
Язык и средаPython 3.10+, virtualenvРазработка и тестирование
Источник данныхSQLite (встроенная библиотека sqlite3)Хранение документов
Кэш (основной)Redis (через redis-py)Кэширование эмбеддингов / результатов retrieval
Эмбеддингиsentence-transformers, torchГенерация векторов при записи и чтении
Модель данныхPydantic / dataclassСтруктура документа
Тестированиеpytest, unittest.mockЮнит-тесты и интеграционные тесты
Логированиеlogging (stdout + file)Отладка и мониторинг операций кэша

4. Этапы выполнения

Этап 1: Подготовка окружения и минимальной RAG-системы (30 мин)

Действия

  1. Создайте виртуальное окружение и установите зависимости:
    python -m venv venv && source venv/bin/activate
    pip install redis sentence-transformers sqlite3 pytest pydantic
    
  2. Инициализируйте SQLite-базу с таблицей документов:
    import sqlite3
    conn = sqlite3.connect('rag.db')
    conn.execute('''CREATE TABLE IF NOT EXISTS documents
                    (id INTEGER PRIMARY KEY, title TEXT, content TEXT, updated_at TEXT)''')
    conn.close()
    
  3. Напишите класс DocumentCache с методами get(key) и set(key, value) (Redis-клиент).
  4. Реализуйте простой retriever: принимает запрос, эмбеддит, ищет в кэше (на этом этапе — заглушка).

Ожидаемый результат этапа Рабочая база данных с тестовыми документами, Redis-клиент, возможность кэшировать и получать значение.

Этап 2: Реализация write-through логики (1.5 часа)

Действия

  1. Спроектируйте интерфейс CacheBackend, который может быть Redis или in-memory dict.
  2. Реализуйте класс WriteThroughCache:
    • При вызове put(key, value) — сначала записать в primary storage (SQLite), затем пересчитать эмбеддинг и записать в кэш.
    • При вызове delete(key) — удалить из primary storage и из кэша.
    • Метод get(key) — читать из кэша (если промах — достать из БД, сгенерировать эмбеддинг, записать в кэш и вернуть).
  3. Добавьте функцию update_document_batch(docs), которая атомарно применяет write-through к списку изменений.
  4. Обработка коллизий: если при записи в кэш возникла ошибка — не прерывать запись в БД, но залогировать и установить флаг needs_sync.

Ключевой код (пример):

class WriteThroughCache:
    def __init__(self, primary_storage, cache_backend, embedder):
        self.primary = primary_storage
        self.cache = cache_backend
        self.embedder = embedder

    def put(self, doc_id, doc_data):
        # 1. Пишем в primary storage
        self.primary.upsert(doc_id, doc_data)
        # 2. Генерируем эмбеддинг (если нужно кэшировать эмбеддинг)
        embedding = self.embedder.encode(doc_data['content'])
        # 3. Пишем в кэш
        self.cache.set(f"doc:{doc_id}", embedding)
        # 4. Логируем
        logging.info(f"Write-through: document {doc_id} updated in cache")

Ожидаемый результат этапа Модуль, который синхронно обновляет и primary storage, и кэш.

Этап 3: Интеграция с событиями обновления документов (1 час)

Действия

  1. Создайте эмулятор внешних изменений: прослушивайте очередь Redis (Pub/Sub) на канале doc_updates или polling по полю updated_at.
  2. Напишите скрипт simulate_update.py, который случайным образом выбирает документ, изменяет его содержимое и вызывает WriteThroughCache.put().
  3. Реализуйте fallback: если кэш недоступен, система должна читать из primary storage (возможно, с TTL-инвалидацией по таймеру).
  4. Проверьте, что после вызова put старые эмбеддинги больше не возвращаются: тест вызывает get до и после изменения и сравнивает векторы — они должны различаться.

Тестовый сценарий

def test_no_stale_data():
    doc_id = 1
    original_content = "Hello world"
    updated_content = "Hello cache system"
    cache.put(doc_id, {'content': original_content})
    old_embedding = cache.get(doc_id)
    cache.put(doc_id, {'content': updated_content})
    new_embedding = cache.get(doc_id)
    assert old_embedding != new_embedding, "Stale data detected!"

Ожидаемый результат этапа Автоматизированный скрипт, который при изменении документа в источнике обновляет кэш, и тест подтверждает отсутствие stale data.

Этап 4: Мониторинг, логирование и обработка ошибок (30 мин)

Действия

  1. Добавьте метрики: количество write-through операций, количество сбоев записи в кэш, hit/miss ratio.
  2. Настройте логирование с уровнем INFO для обычных операций и WARNING/ERROR для ошибок кэша.
  3. Реализуйте повтор при ошибке кэша с экспоненциальной задержкой (retry 3 раза).
  4. Напишите скрипт стресс-теста: 100 одновременных обновлений, проверьте, что кэш не рассинхронизирован.

Ожидаемый результат этапа Логи и метрики, доступные для анализа; обработка ошибок не приводит к потере данных.

Этап 5: Документирование и финальное тестирование (30 мин)

Действия

  1. Напишите README с описанием архитектуры, методов класса WriteThroughCache, примерами запуска.
  2. Добавьте docstrings ко всем публичным методам.
  3. Запустите полный набор тестов: pytest test/ -v.
  4. Убедитесь, что код покрывает все граничные случаи: пустой content, дубликаты, массовые обновления.

Ожидаемый результат этапа Пакет с кодом, документацией и тестами готов к использованию.


5. Критерии приемки (Definition of Done)

  • Реализован класс WriteThroughCache с методами put, get, delete.
  • При вызове put документ сначала записывается в primary storage, затем в кэш.
  • При вызове get возвращается актуальное значение; после изменения документа старые данные не возвращаются.
  • Обработаны ошибки кэша: при недоступности Redis система корректно переключается на чтение из primary storage.
  • Написаны не менее 3 интеграционных тестов (один на stale data, один на ошибку кэша, один на массовое обновление).
  • Код покрыт логами с уровнями INFO и ERROR.
  • Создан README с инструкцией по запуску и примерами.
  • Все тесты проходят успешно (pytest exit code 0).

6. Ожидаемый результат

  • Файл/артефакт Python-пакет write_through_cache с модулями:
    • cache_backend.py — абстракция для Redis и in-memory.
    • write_through.py — класс WriteThroughCache.
    • models.pyPydantic-модель документа.
    • storage.py — работа с SQLite.
    • simulate_update.py — скрипт-демонстрация.
  • Содержание Полностью рабочий механизм write-through кэширования для RAG.
  • Дополнительные результаты
    • Лог-файл с примерами операций.
    • Метрики hit/miss (если есть Grafana/Prometheus — дашборд, но не обязательно).

7. Возможные сложности и их решение

СложностьРешение
Race condition при параллельных записях и чтенияхИспользовать блокировку (Redis Lock / threading.Lock) на ключ документа.
Отказ Redis -> потеря кэшаРеализовать fallback на primary storage; при восстановлении Redis можно прогреть кэш.
Высокая latency при пересчёте эмбеддинга (большие документы)Кэшировать только эмбеддинги, а не сырые данные; вычислять асинхронно (background task) с подтверждением записи в primary.
Инвалидация связанных кэшей (например, full-text search)Добавить listener на события и при изменении одного документа инвалидировать все связанные ключи.
Ошибка сети при записи в кэш после успешной записи в БДЛогировать и отмечать запись как dirty для последующей репликации.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка окружения30 мин
Этап 2: Реализация write-through логики1 час 30 мин
Этап 3: Интеграция с событиями1 час
Этап 4: Мониторинг и обработка ошибок30 мин
Этап 5: Документирование и тестирование30 мин
Итого4 часа

Примечание Для первого раза может потребоваться до 6 часов из-за отладки Redis и тестирования граничных случаев.


9. Связанные вопросы из базы знаний

ВопросТема
12Основы кэширования (cache hit/miss, TTL)
45Инвалидация кэша при обновлении данных
102RAG pipeline: embedding, retrieval, generation
201Write-through vs write-back стратегии
304Консистентность данных в распределённых системах
410Обработка ошибок в production (retry, fallback)
518Использование Redis в AI-системах
607Тестирование кэш-стратегий (unit, integration)
720Мониторинг и метрики для кэшей
888Batch update и atomic операции

10. Чек-лист самопроверки

  • Я написал и запустил тесты, и они не показывают stale data после обновления документа.
  • Я проверил сценарий, когда кэш-сервер (Redis) временно недоступен — система не падает, а читает из primary storage.
  • Я добавил логирование ключевых операций (put, get, delete) с разными уровнями.
  • Я написал README, который может прочитать другой разработчик и сразу запустить систему без дополнительных вопросов.
  • Я протестировал массовое обновление (50+ документов) и убедился, что кэш остаётся консистентным.