Реализовать cache stampede защиту с singleflight
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать cache stampede защиту с singleflight
1. Цель задачи
Научиться защищать кэширующий слой от эффекта stampede (лавинного перестроения), когда множество одновременных запросов к одному ключу приводят к N-кратному обращению к источнику (БД) вместо одного. Реализовать механизм singleflight, при котором при истечении кэша только один запрос попадает в базу данных, остальные ждут его результат и используют перестроенный кэш.
Ключевой результат При перестроении кэша для любого популярного ключа ровно 1 запрос идёт в БД, остальные — получают ответ после перестроения (или из кэша, если он уже записан).
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Python-проект с REST API (FastAPI/Flask) | Собственный проект или шаблон |
| Кэширующий слой (Redis / in-memory dict) | Установленный Redis (локально или Docker) или стандартный functools.lru_cache |
| База данных (PostgreSQL / SQLite / mock) | Локальная БД или in-memory SQLite для тестов |
| Инструмент нагрузочного тестирования | locust, wrk, ab или httpx с asyncio |
| Мониторинг/логирование | logging или дашборд (опционально) |
Если нет реального инструмента — симулируем:
- Redis — поднимите Docker-контейнер:
docker run -d -p 6379:6379 --name redis-stampede redis:7-alpine - БД — используйте SQLite (встроенный модуль) с 1000 записей (например, таблица
users) или любой ORM (SQLAlchemy). - Нагрузочный тест — напишите Python-скрипт с asyncio и httpx.AsyncClient, который одновременно шлёт 20+ запросов к одному эндпоинту.
- Локальный кэш — если Redis недоступен, используйте
dictс TTL и блокировкой asyncio.Lock.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.11+ | Реализация логики |
| Веб-фреймворк | FastAPI (предпочтительно) или Flask с asyncio | REST API |
| Кэш | Redis (через redis-py или aioredis) / in-memory dict | Хранение временных данных |
| База данных | PostgreSQL / SQLite / mock dict | Источник данных (дорогой запрос) |
| Singleflight | Самописная реализация или библиотека aiolibs / sync/async | Координация конкурентных запросов |
| Нагрузочное тестирование | locust / httpx / asyncio.gather | Симуляция stampede |
| Мониторинг | logging + счётчики (statsd опционально) | Отслеживание числа запросов в БД |
4. Этапы выполнения
Этап 1: Подготовка окружения и базовая архитектура (30 мин)
Действия
- Создайте новый проект Python: mkdir cache-stampede && cd cache-stampede && python -m venv venv
- Установите зависимости: fastapi uvicorn redis httpx aiofiles (или только uvicorn + fastapi + json).
- Создайте структуру файлов:
. ├── main.py # FastAPI приложение ├── cache.py # Абстракция кэша (get/set/delete) ├── db.py # Симуляция БД (дорогой запрос) ├── singleflight.py # Реализация singleflight └── test_stampede.py # Нагрузочный тест - В
db.pyреализуйте функцию get_data_from_db(key: str) -> dict с искусственной задержкой (например, await asyncio.sleep(0.5)) — имитация медленного запроса. - В cache.py реализуйте класс Cache, который поддерживает TTL и методы async def get(key), async def set(key, value, ttl). Используйте Redis или in-memory.
Ожидаемый результат этапа Запускаемый FastAPI-сервер с эндпоинтом GET /data/{key}, который сначала проверяет кэш, при промахе идёт в БД и кладёт результат в кэш (без singleflight).
Этап 2: Воспроизведение stampede (20 мин)
Действия
- Напишите нагрузочный тест в
test_stampede.py:
import asyncio
import httpx
import time
async def send_request(client, key):
start = time.perf_counter()
r = await client.get(f"http://localhost:8000/data/{key}")
elapsed = time.perf_counter() - start
return r.status_code, elapsed
async def main():
key = "popular:1001"
# дождаться истечения кэша (например, установить TTL=1 секунда)
await asyncio.sleep(1.2)
async with httpx.AsyncClient() as client:
tasks = [send_request(client, key) for _ in range(20)]
results = await asyncio.gather(*tasks)
successes = [r for r in results if r[0] == 200]
print(f"Успешных запросов: {len(successes)}")
# оценить число обращений к БД по времени ответа (все ли > 0.5с?)
asyncio.run(main())
- Запустите сервер (uvicorn main:app --port 8000) и выполните тест.
- В логах или счётчиках убедитесь, что для одного ключа было 20 запросов в БД (все параллельно ждут 0.5с каждый → суммарное время ~0.5с, но это 20 вызовов БД).
Ожидаемый результат этапа Подтверждённый stampede: при отсутствии кэша все 20 запросов пошли в БД.
Этап 3: Реализация singleflight (1.5 часа)
Действия
- Создайте класс SingleFlight в singleflight.py:
import asyncio
from typing import Awaitable, Callable, Dict, Optional
class SingleFlight:
def __init__(self):
self._in_flight: Dict[str, asyncio.Future] = {}
async def do(self, key: str, func: Callable[[], Awaitable]) -> tuple:
# 1. Проверить, есть ли уже выполняемая задача для key
if key in self._in_flight:
# 2. Если есть — подождать результат этой задачи
return await asyncio.shield(self._in_flight[key]), False
# 3. Если нет — создать Future, положить в словарь, выполнить func
future = asyncio.get_event_loop().create_future()
self._in_flight[key] = future
try:
result = await func()
future.set_result(result)
return result, True # True = этот запрос первый
except Exception as e:
future.set_exception(e)
raise
finally:
# 4. Удалить из словаря после завершения
self._in_flight.pop(key, None)
- Интегрируйте SingleFlight в эндпоинт GET /data/{key}:
from fastapi import FastAPI
from cache import Cache
from db import get_data_from_db
from singleflight import SingleFlight
app = FastAPI()
cache = Cache(ttl=1)
flight = SingleFlight()
db_call_counter = 0 # для демонстрации
@app.get("/data/{key}")
async def get_data(key: str):
global db_call_counter
# 1. Попытка прочитать из кэша
cached = await cache.get(key)
if cached is not None:
return {"source": "cache", "data": cached}
# 2. Если промах — передать функцию получения данных в singleflight
async def fetch_from_db():
global db_call_counter
db_call_counter += 1
data = await get_data_from_db(key)
await cache.set(key, data, ttl=1)
return data
result, was_first = await flight.do(key, fetch_from_db)
return {"source": "db", "first": was_first, "data": result}
- Добавьте эндпоинт /metrics для вывода счётчика обращений к БД.
- Перезапустите сервер и повторно выполните нагрузочный тест.
- Проверьте счётчик — теперь он должен быть равен 1 (или очень маленькому числу, если у вас не синхронизировано время).
Ожидаемый результат этапа При нагрузке в 20 параллельных запросов к одному ключу только 1 запрос достигает БД; остальные получают данные либо из singleflight (дождались), либо из кэша (если кэш уже записан к моменту их попадания в эндпоинт).
Этап 4: Расширение тестов и граничные случаи (30 мин)
Действия
- Добавьте тест на случай ошибки в БД: убедитесь, что singleflight корректно пробрасывает исключение всем ожидающим.
- Добавьте тест на разные ключи (популярные и не очень) — проверьте, что singleflight работает изолированно.
- Добавьте тест с TTL кэша и истечением — убедитесь, что при повторном запросе после истечения снова срабатывает singleflight.
- Напишите unit-тесты на класс SingleFlight (мокая asyncio.sleep):
import pytest
import asyncio
from singleflight import SingleFlight
@pytest.mark.asyncio
async def test_singleflight_blocks_duplicates():
flight = SingleFlight()
call_count = 0
async def slow_func():
nonlocal call_count
call_count += 1
await asyncio.sleep(0.2)
return "result"
results = await asyncio.gather(
flight.do("key", slow_func),
flight.do("key", slow_func),
flight.do("key", slow_func),
)
assert call_count == 1
assert results[0] == results[1] == results[2]
Ожидаемый результат этапа Все тесты проходят, singleflight корректно работает при ошибках, истечении кэша и на разных ключах.
Этап 5: Мониторинг и улучшения (20 мин)
Действия
- Добавьте логирование каждого вызова singleflight: номер ключа, был ли запрос первым, время ожидания.
- Реализуйте таймаут для singleflight (если БД падает — не держать ожидающие запросы бесконечно):
- Используйте
asyncio.wait_for(future, timeout=5.0).
- Используйте
- Добавьте метрики Prometheus (опционально): счётчик
db_queries_total, гистограммаsingleflight_wait_seconds. - Проверьте поведение под настоящей нагрузкой (
locustс несколькими пользователями).
Ожидаемый результат этапа Singleflight-защита включена в production-контур с логированием и, при желании, мониторингом.
5. Критерии приемки (Definition of Done)
- Реализован эндпоинт
GET /data/{key}, который возвращает данные из кэша или из БД. - При параллельных запросах к одному ключу (20+ одновременно) в БД выполняется ровно 1 запрос.
- Все ожидающие запросы (не первый) получают актуальные данные (либо дожидаются первого, либо читают из кэша).
- Кэш имеет TTL; после истечения механизм singleflight снова срабатывает.
- Реализована обработка ошибок: если функция БД падает, все зависшие запросы получают ту же ошибку.
- Написаны как минимум 3 unit-теста (на блокировку, на ошибку, на разные ключи).
- Нагрузочный тест демонстрирует, что число вызовов БД равно 1 (или очень близко к 1).
- Код покрыт комментариями, а класс SingleFlight имеет docstring.
6. Ожидаемый результат
Основной артефакт Репозиторий с кодом (Python), включающий:
main.py— FastAPI приложение с эндпоинтом/data/{key}и/metrics.cache.py— абстракция кэша (Redis или in-memory).db.py— симулятор медленного запроса.singleflight.py— реализация singleflight с асинхронной блокировкой.test_stampede.py— скрипт нагрузочного теста.tests/— папка с unit-тестами.
Дополнительные результаты (по желанию):
- Docker-compose для быстрого запуска (Redis + FastAPI).
- Prometheus-метрики и дашборд Grafana.
- Документация в README с примером использования и графиком нагрузочного теста.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Гонка при записи в кэш (первый запрос уже записал кэш, но второй не успел проверить) | Использовать атомарные операции Redis (SETNX + TTL) или блокировку на уровне приложения (singleflight уже решает). |
Утечка памяти в _in_flight (если ключей очень много) | Добавить ограничение по размеру словаря (LRU) или удалять ключи по таймауту. |
| Future не удаляется при исключении (ошибка в пользовательской функции) | Гарантировать finally в do (реализовано выше). |
| Таймаут ожидания (БД зависла — все запросы ждут бесконечно) | Использовать asyncio.wait_for(future, timeout) и удалять future из словаря по таймауту. |
| Синхронная БД (не asyncio) | Обернуть вызов в loop.run_in_executor; singleflight всё равно работает на уровне корутин. |
| Кэш с коротким TTL и сильной нагрузкой | Комбинировать singleflight с "мягким TTL" (stale-while-revalidate) — ещё один паттерн, но выходит за рамки задачи. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка окружения и базовая архитектура | 30 мин |
| Этап 2: Воспроизведение stampede | 20 мин |
| Этап 3: Реализация singleflight | 1 ч 30 мин |
| Этап 4: Расширение тестов и граничные случаи | 30 мин |
| Этап 5: Мониторинг и улучшения | 20 мин |
| Итого | ~3 часа 10 мин |
Примечание: Для первого раза может потребоваться до 4–5 часов из-за отладки асинхронности и Redis.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 1 | Принципы работы кэширования (Cache-Aside, Write-Through) |
| 2 | TTL и стратегии инвалидации кэша |
| 3 | Асинхронное программирование в Python (asyncio, Future) |
| 4 | Паттерн "Singleflight" (Google Groupcache) |
| 5 | Эффект лавинного перестроения (Cache Stampede) |
| 6 | Redis: атомарные операции и блокировки |
| 7 | Prometheus metrcis и мониторинг производительности |
| 8 | Unit-тестирование асинхронного кода (pytest-asyncio) |
| 9 | Работа с Docker (локальный Redis) |
| 10 | Нагрузочное тестирование (Locust, wrk) |
10. Чек-лист самопроверки
- Я написал нагрузочный тест и убедился, что без singleflight было ≥20 вызовов БД, а после — 1.
- В singleflight корректно обрабатываются исключения (вызывающая сторона получает ту же ошибку).
- Я проверил, что singleflight работает изолированно для разных ключей.
- Кэш имеет TTL, и после его истечения singleflight снова срабатывает.
- Я написал минимум 3 unit-теста для класса SingleFlight.
- Код покрыт комментариями на русском или английском (понятно коллеге).
- В проекте есть README с инструкцией по запуску (как поднять Redis, как запустить тест).
- Я добавил логирование для каждого вызова singleflight (видно в консоли, кто первый).