Настроить write-through cache

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить write-through cache

1. Цель задачи

Научиться реализовывать паттерн write-through cache для синхронного обновления кэша при записи данных в основное хранилище. Вы разработаете API-эндпоинт для обновления документа, который при каждом успешном сохранении в БД немедленно обновляет соответствующую запись в Redis. Главный фокус — гарантия консистентности данных между кэшем и базой данных в режиме реального времени.

Ключевой результат Работающий write-through кэш, при котором после любого PUT /documents/:id данные в Redis и PostgreSQL идентичны, а latency чтения из кэша составляет <5 мс.


2. Исходные данные

Что нужноОткуда взять
Локальный Linux/macOS/WSLПерсональный компьютер
Docker & docker-composeОфициальный сайт Docker
PostgreSQL (локально или в контейнере)docker-compose.yml
Redis (локально или в контейнере)docker-compose.yml
Python 3.10+ с FastAPI/FlaskУстановить через pyenv / conda
Postman / curl / httpieЛюбой HTTP-клиент

Если нет реального инструмента — симулируем:

  1. Разверните локальный стенд с помощью docker-compose (PostgreSQL + Redis).
  2. Используйте библиотеку docker из Python (через subprocess), если Docker Desktop недоступен — запустите PostgreSQL и Redis напрямую через apt install / brew install.
  3. Для тестирования консистентности напишите небольшой скрипт на Python, который после каждой записи делает SELECT из БД и GET из Redis и сравнивает значения.

3. Технологический стек

КомпонентИнструментыНазначение
API-серверFastAPI (Python 3.11)Обработка HTTP-запросов
Основное хранилищеPostgreSQL 15Персистентное хранение документов
КэшRedis 7Быстрое in-memory хранение
ORM / DB-driverSQLAlchemy 2.0 + asyncpgАсинхронная работа с БД
Клиент Redisredis-py (aioredis)Чтение/запись в кэш
Контейнеризацияdocker-composeПодъём инфраструктуры
Тестированиеpytest + httpxЮнит- и интеграционные тесты
Мониторинг (опционально)Prometheus + GrafanaМетрики латентности

4. Этапы выполнения

Этап 1: Подготовка окружения и развёртывание инфраструктуры (45 минут)

Действия

  1. Создайте директорию проекта и файл docker-compose.yml со следующим содержимым:
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: docstore
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7
    ports:
      - "6379:6379"

volumes:
  pgdata:
  1. Запустите контейнеры docker-compose up -d

  2. Настройте виртуальное окружение Python и установите зависимости:

python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn sqlalchemy asyncpg redis httpx pytest pytest-asyncio
  1. Создайте базовую структуру проекта
project/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI приложение
│   ├── database.py      # Подключение к PostgreSQL
│   ├── models.py        # SQLAlchemy модель
│   ├── cache.py         # Работа с Redis
│   └── routers/
│       └── documents.py # Эндпоинты
├── tests/
│   └── test_write_through.py
├── docker-compose.yml
└── requirements.txt

Ожидаемый результат этапа
Контейнеры PostgreSQL и Redis запущены, FastAPI приложение стартует без ошибок.


Этап 2: Реализация модели данных и базового CRUD (1 час)

Действия

  1. Создайте модель документа в models.py:
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from app.database import Base

class Document(Base):
    __tablename__ = "documents"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(255), nullable=False)
    content = Column(Text)
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
  1. Настройте асинхронное подключение в database.py:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/docstore"

engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()

async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
  1. Реализуйте базовый CRUD-эндпоинт (routers/documents.py):
  • GET /documents/{id} — чтение из кэша (пока без кэша).
  • PUT /documents/{id} — обновление документа в БД (пока без кэша).
  1. Подключите роутер в main.py и добавьте startup-событие для инициализации БД.

Ожидаемый результат этапа
API работает: при PUT /documents/1 данные сохраняются в PostgreSQL, при GET /documents/1 возвращаются корректные данные.


Этап 3: Реализация write-through кэша (1.5 часа)

Действия

  1. Создайте модуль работы с Redis cache.py:
import redis.asyncio as redis

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

CACHE_PREFIX = "doc:"
CACHE_TTL = 3600  # 1 час

async def get_document_from_cache(doc_id: int):
    data = await redis_client.get(f"{CACHE_PREFIX}{doc_id}")
    if data:
        import json
        return json.loads(data)
    return None

async def set_document_to_cache(doc_id: int, data: dict):
    import json
    await redis_client.setex(f"{CACHE_PREFIX}{doc_id}", CACHE_TTL, json.dumps(data))
  1. Модифицируйте эндпоинт PUT /documents/{id} (write-through логика):
  • Получить тело запроса (title, content).
  • Обновить запись в PostgreSQL.
  • Сформировать словарь с актуальными данными (включая новое updated_at).
  • Синхронно (в той же корутине) вызвать set_document_to_cache.
  • Вернуть обновлённый документ.
  1. Обновите GET /documents/{id}:
  • Сначала проверить кэш.
  • Если нет — прочитать из БД и записать в кэш (lazy-loading, но это уже не write-through, просто оптимизация).
  1. Добавьте обработку ошибок: если Redis недоступен, операция записи не должна падать — только логировать, так как данные уже в БД.

Ожидаемый результат этапа
После PUT запроса данные появляются и в БД, и в Redis с одинаковым содержимым и временем обновления.


Этап 4: Написание тестов и верификация консистентности (1 час)

Действия

  1. Напишите интеграционный тест в test_write_through.py:
import pytest
from httpx import AsyncClient
from app.main import app
from app.database import async_session, init_db
from app.cache import redis_client

@pytest.fixture(autouse=True)
async def setup_db():
    await init_db()
    yield
    async with async_session() as session:
        await session.execute("TRUNCATE documents RESTART IDENTITY CASCADE")
        await session.commit()
    await redis_client.flushdb()

@pytest.mark.asyncio
async def test_write_through_consistency():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Создаём документ
        create_resp = await client.put("/documents/1", json={"title": "Test", "content": "Hello"})
        assert create_resp.status_code == 200

        # Получаем из кэша напрямую
        cached = await redis_client.get("doc:1")
        assert cached is not None, "Кэш должен содержать запись"

        # Проверяем идентичность
        import json
        cached_data = json.loads(cached)
        resp_data = create_resp.json()
        assert cached_data["title"] == resp_data["title"]
        assert cached_data["content"] == resp_data["content"]
        assert cached_data["updated_at"] == resp_data["updated_at"]

        # Обновляем
        update_resp = await client.put("/documents/1", json={"title": "Updated", "content": "World"})
        assert update_resp.status_code == 200

        # Проверяем, что кэш обновился
        cached_updated = json.loads(await redis_client.get("doc:1"))
        assert cached_updated["title"] == "Updated"
        assert cached_updated["content"] == "World"
        assert cached_updated["updated_at"] != cached_data["updated_at"]
  1. Запустите тесты pytest tests/ -v

  2. Дополнительно: проверьте поведение при недоступности Redis — закройте контейнер redis и выполните PUT. Убедитесь, что API возвращает успех, а данные сохранены в БД.

Ожидаемый результат этапа
Все тесты проходят. Консистентность между БД и кэшем подтверждена автоматически.


Этап 5: Измерение производительности и документирование (30 минут)

Действия

  1. Напишите небольшой load-тест с помощью locust или wrk, чтобы сравнить latency GET-запросов с кэшем и без него.

  2. Зафиксируйте метрики:

  • Среднее время ответа GET без кэша (через БД).
  • Среднее время ответа GET с кэшем (после write-through).
  • Latency самой записи (разница между PUT с кэшем и без него).
  1. Оформите результаты в виде комментария в README.md:
## Результаты производительности
- GET без кэша: ~10-15 ms
- GET с кэшем: ~1-2 ms
- PUT overhead (write-through): +0.5 ms в среднем

Ожидаемый результат этапа
Документация с цифрами, подтверждающая, что write-through незначительно увеличивает время записи (<1 мс) и кардинально ускоряет чтение.


5. Критерии приемки (Definition of Done)

  • PostgreSQL и Redis развёрнуты в контейнерах, приложение стартует без ошибок.
  • PUT /documents/{id} синхронно обновляет и БД, и кэш.
  • GET /documents/{id} сначала проверяет кэш, при попадании не ходит в БД.
  • Написаны и проходят минимум 3 интеграционных теста на консистентность.
  • При отказе Redis запись в БД завершается успешно, кэш-ошибка логируется.
  • В README описана архитектура write-through, результаты load-теста с цифрами.
  • Код соответствует PEP 8, используется async/await, типизация.

6. Ожидаемый результат

Артефакт Полноценный микросервис с write-through кэшем на FastAPI + Redis + PostgreSQL.

Содержание

  • Исходный код в директории app/.
  • docker-compose.yml для инфраструктуры.
  • tests/ с интеграционными тестами.
  • README.md с описанием, инструкцией по запуску и результатами производительности.

Опционально

  • Дашборд Grafana с метриками latency (если внедрён Prometheus).
  • Скрипт для бенчмарка (locustfile.py).

7. Возможные сложности и их решение

СложностьРешение
Redis не запущен или отвалился во время PUTИспользовать try/except вокруг записи в кэш, логировать ошибку, не прерывать запись в БД.
Race condition — два параллельных PUT в один документИспользовать оптимистичную блокировку (версия документа) или Redis Lock (SETNX) перед обновлением. В простой версии — последняя запись побеждает.
Сериализация/десериализация datetimeПреобразовывать updated_at в строку ISO-формата при записи в кэш.
Сброс кэша после рестарта RedisWrite-through гарантирует, что при следующем PUT кэш обновится; для существующих данных — lazy-loading при GET.
Асинхронность — забыл await в кэшеВсе вызовы Redis должны быть await. Использовать линтер (flake8-async) для выявления.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка окружения45 мин
Этап 2: Базовый CRUD1 ч
Этап 3: Write-through кэш1 ч 30 мин
Этап 4: Тестирование1 ч
Этап 5: Производительность и документация30 мин
Итого4 ч 45 мин

Примечание для первого раза Если вы впервые работаете с async Redis или FastAPI, заложите +1 час на отладку асинхронных вызовов.


9. Связанные вопросы из базы знаний

ВопросТема
12Cache invalidation strategies
15Write-behind vs write-through
22Redis data types used for caching
34Consistency models in distributed systems
47FastAPI dependency injection
81SQLAlchemy async sessions
103Docker Compose networking
210Performance testing of caching layer
245Handling cache failures gracefully
267TTL and cache eviction policies

10. Чек-лист самопроверки

  • Я развернул Docker-контейнеры и проверил, что приложение стартует.
  • Написал модель Document и создал таблицу в БД.
  • Реализовал write-through в PUT-эндпоинте: запись в БД, затем в Redis.
  • Добавил чтение из кэша в GET.
  • Написал хотя бы один тест, который проверяет, что данные в БД и Redis совпадают после PUT.
  • Симулировал падение Redis и убедился, что запись в БД не ломается.
  • Измерил latency и записал результаты в README.