Настроить write-through cache
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настроить write-through cache
1. Цель задачи
Научиться реализовывать паттерн write-through cache для синхронного обновления кэша при записи данных в основное хранилище. Вы разработаете API-эндпоинт для обновления документа, который при каждом успешном сохранении в БД немедленно обновляет соответствующую запись в Redis. Главный фокус — гарантия консистентности данных между кэшем и базой данных в режиме реального времени.
Ключевой результат Работающий write-through кэш, при котором после любого PUT /documents/:id данные в Redis и PostgreSQL идентичны, а latency чтения из кэша составляет <5 мс.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Локальный Linux/macOS/WSL | Персональный компьютер |
| Docker & docker-compose | Официальный сайт Docker |
| PostgreSQL (локально или в контейнере) | docker-compose.yml |
| Redis (локально или в контейнере) | docker-compose.yml |
| Python 3.10+ с FastAPI/Flask | Установить через pyenv / conda |
| Postman / curl / httpie | Любой HTTP-клиент |
Если нет реального инструмента — симулируем:
- Разверните локальный стенд с помощью docker-compose (PostgreSQL + Redis).
- Используйте библиотеку docker из Python (через subprocess), если Docker Desktop недоступен — запустите PostgreSQL и Redis напрямую через
apt install/brew install. - Для тестирования консистентности напишите небольшой скрипт на Python, который после каждой записи делает
SELECTиз БД иGETиз Redis и сравнивает значения.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| API-сервер | FastAPI (Python 3.11) | Обработка HTTP-запросов |
| Основное хранилище | PostgreSQL 15 | Персистентное хранение документов |
| Кэш | Redis 7 | Быстрое in-memory хранение |
| ORM / DB-driver | SQLAlchemy 2.0 + asyncpg | Асинхронная работа с БД |
| Клиент Redis | redis-py (aioredis) | Чтение/запись в кэш |
| Контейнеризация | docker-compose | Подъём инфраструктуры |
| Тестирование | pytest + httpx | Юнит- и интеграционные тесты |
| Мониторинг (опционально) | Prometheus + Grafana | Метрики латентности |
4. Этапы выполнения
Этап 1: Подготовка окружения и развёртывание инфраструктуры (45 минут)
Действия
- Создайте директорию проекта и файл docker-compose.yml со следующим содержимым:
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: docstore
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
pgdata:
-
Запустите контейнеры docker-compose up -d
-
Настройте виртуальное окружение Python и установите зависимости:
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn sqlalchemy asyncpg redis httpx pytest pytest-asyncio
- Создайте базовую структуру проекта
project/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI приложение
│ ├── database.py # Подключение к PostgreSQL
│ ├── models.py # SQLAlchemy модель
│ ├── cache.py # Работа с Redis
│ └── routers/
│ └── documents.py # Эндпоинты
├── tests/
│ └── test_write_through.py
├── docker-compose.yml
└── requirements.txt
Ожидаемый результат этапа
Контейнеры PostgreSQL и Redis запущены, FastAPI приложение стартует без ошибок.
Этап 2: Реализация модели данных и базового CRUD (1 час)
Действия
- Создайте модель документа в
models.py:
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from app.database import Base
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False)
content = Column(Text)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
- Настройте асинхронное подключение в database.py:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/docstore"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
- Реализуйте базовый CRUD-эндпоинт (
routers/documents.py):
GET /documents/{id}— чтение из кэша (пока без кэша).- PUT /documents/{id} — обновление документа в БД (пока без кэша).
- Подключите роутер в
main.pyи добавьте startup-событие для инициализации БД.
Ожидаемый результат этапа
API работает: при PUT /documents/1 данные сохраняются в PostgreSQL, при GET /documents/1 возвращаются корректные данные.
Этап 3: Реализация write-through кэша (1.5 часа)
Действия
- Создайте модуль работы с Redis cache.py:
import redis.asyncio as redis
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
CACHE_PREFIX = "doc:"
CACHE_TTL = 3600 # 1 час
async def get_document_from_cache(doc_id: int):
data = await redis_client.get(f"{CACHE_PREFIX}{doc_id}")
if data:
import json
return json.loads(data)
return None
async def set_document_to_cache(doc_id: int, data: dict):
import json
await redis_client.setex(f"{CACHE_PREFIX}{doc_id}", CACHE_TTL, json.dumps(data))
- Модифицируйте эндпоинт PUT /documents/{id} (write-through логика):
- Получить тело запроса (title, content).
- Обновить запись в PostgreSQL.
- Сформировать словарь с актуальными данными (включая новое
updated_at). - Синхронно (в той же корутине) вызвать
set_document_to_cache. - Вернуть обновлённый документ.
- Обновите
GET /documents/{id}:
- Сначала проверить кэш.
- Если нет — прочитать из БД и записать в кэш (lazy-loading, но это уже не write-through, просто оптимизация).
- Добавьте обработку ошибок: если Redis недоступен, операция записи не должна падать — только логировать, так как данные уже в БД.
Ожидаемый результат этапа
После PUT запроса данные появляются и в БД, и в Redis с одинаковым содержимым и временем обновления.
Этап 4: Написание тестов и верификация консистентности (1 час)
Действия
- Напишите интеграционный тест в
test_write_through.py:
import pytest
from httpx import AsyncClient
from app.main import app
from app.database import async_session, init_db
from app.cache import redis_client
@pytest.fixture(autouse=True)
async def setup_db():
await init_db()
yield
async with async_session() as session:
await session.execute("TRUNCATE documents RESTART IDENTITY CASCADE")
await session.commit()
await redis_client.flushdb()
@pytest.mark.asyncio
async def test_write_through_consistency():
async with AsyncClient(app=app, base_url="http://test") as client:
# Создаём документ
create_resp = await client.put("/documents/1", json={"title": "Test", "content": "Hello"})
assert create_resp.status_code == 200
# Получаем из кэша напрямую
cached = await redis_client.get("doc:1")
assert cached is not None, "Кэш должен содержать запись"
# Проверяем идентичность
import json
cached_data = json.loads(cached)
resp_data = create_resp.json()
assert cached_data["title"] == resp_data["title"]
assert cached_data["content"] == resp_data["content"]
assert cached_data["updated_at"] == resp_data["updated_at"]
# Обновляем
update_resp = await client.put("/documents/1", json={"title": "Updated", "content": "World"})
assert update_resp.status_code == 200
# Проверяем, что кэш обновился
cached_updated = json.loads(await redis_client.get("doc:1"))
assert cached_updated["title"] == "Updated"
assert cached_updated["content"] == "World"
assert cached_updated["updated_at"] != cached_data["updated_at"]
-
Дополнительно: проверьте поведение при недоступности Redis — закройте контейнер redis и выполните PUT. Убедитесь, что API возвращает успех, а данные сохранены в БД.
Ожидаемый результат этапа
Все тесты проходят. Консистентность между БД и кэшем подтверждена автоматически.
Этап 5: Измерение производительности и документирование (30 минут)
Действия
-
Напишите небольшой load-тест с помощью locust или wrk, чтобы сравнить latency
GET-запросов с кэшем и без него. -
Зафиксируйте метрики:
- Среднее время ответа
GETбез кэша (через БД). - Среднее время ответа
GETс кэшем (после write-through). - Latency самой записи (разница между PUT с кэшем и без него).
- Оформите результаты в виде комментария в README.md:
## Результаты производительности
- GET без кэша: ~10-15 ms
- GET с кэшем: ~1-2 ms
- PUT overhead (write-through): +0.5 ms в среднем
Ожидаемый результат этапа
Документация с цифрами, подтверждающая, что write-through незначительно увеличивает время записи (<1 мс) и кардинально ускоряет чтение.
5. Критерии приемки (Definition of Done)
- PostgreSQL и Redis развёрнуты в контейнерах, приложение стартует без ошибок.
- PUT /documents/{id} синхронно обновляет и БД, и кэш.
-
GET /documents/{id}сначала проверяет кэш, при попадании не ходит в БД. - Написаны и проходят минимум 3 интеграционных теста на консистентность.
- При отказе Redis запись в БД завершается успешно, кэш-ошибка логируется.
- В README описана архитектура write-through, результаты load-теста с цифрами.
- Код соответствует PEP 8, используется
async/await, типизация.
6. Ожидаемый результат
Артефакт Полноценный микросервис с write-through кэшем на FastAPI + Redis + PostgreSQL.
Содержание
- Исходный код в директории
app/. docker-compose.ymlдля инфраструктуры.tests/с интеграционными тестами.README.mdс описанием, инструкцией по запуску и результатами производительности.
Опционально
- Дашборд Grafana с метриками latency (если внедрён Prometheus).
- Скрипт для бенчмарка (
locustfile.py).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Redis не запущен или отвалился во время PUT | Использовать try/except вокруг записи в кэш, логировать ошибку, не прерывать запись в БД. |
| Race condition — два параллельных PUT в один документ | Использовать оптимистичную блокировку (версия документа) или Redis Lock (SETNX) перед обновлением. В простой версии — последняя запись побеждает. |
| Сериализация/десериализация datetime | Преобразовывать updated_at в строку ISO-формата при записи в кэш. |
| Сброс кэша после рестарта Redis | Write-through гарантирует, что при следующем PUT кэш обновится; для существующих данных — lazy-loading при GET. |
Асинхронность — забыл await в кэше | Все вызовы Redis должны быть await. Использовать линтер (flake8-async) для выявления. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка окружения | 45 мин |
| Этап 2: Базовый CRUD | 1 ч |
| Этап 3: Write-through кэш | 1 ч 30 мин |
| Этап 4: Тестирование | 1 ч |
| Этап 5: Производительность и документация | 30 мин |
| Итого | 4 ч 45 мин |
Примечание для первого раза Если вы впервые работаете с async Redis или FastAPI, заложите +1 час на отладку асинхронных вызовов.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Cache invalidation strategies |
| 15 | Write-behind vs write-through |
| 22 | Redis data types used for caching |
| 34 | Consistency models in distributed systems |
| 47 | FastAPI dependency injection |
| 81 | SQLAlchemy async sessions |
| 103 | Docker Compose networking |
| 210 | Performance testing of caching layer |
| 245 | Handling cache failures gracefully |
| 267 | TTL and cache eviction policies |
10. Чек-лист самопроверки
- Я развернул Docker-контейнеры и проверил, что приложение стартует.
- Написал модель Document и создал таблицу в БД.
- Реализовал write-through в PUT-эндпоинте: запись в БД, затем в Redis.
- Добавил чтение из кэша в GET.
- Написал хотя бы один тест, который проверяет, что данные в БД и Redis совпадают после PUT.
- Симулировал падение Redis и убедился, что запись в БД не ломается.
- Измерил latency и записал результаты в README.