English translation is not available yet. Showing Russian content.

Реализовать cache stampede защиту с singleflight

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать cache stampede защиту с singleflight

1. Цель задачи

Научиться защищать кэширующий слой от эффекта stampede (лавинного перестроения), когда множество одновременных запросов к одному ключу приводят к N-кратному обращению к источнику (БД) вместо одного. Реализовать механизм singleflight, при котором при истечении кэша только один запрос попадает в базу данных, остальные ждут его результат и используют перестроенный кэш.

Ключевой результат При перестроении кэша для любого популярного ключа ровно 1 запрос идёт в БД, остальные — получают ответ после перестроения (или из кэша, если он уже записан).

2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Python-проект с REST API (FastAPI/Flask)Собственный проект или шаблон
Кэширующий слой (Redis / in-memory dict)Установленный Redis (локально или Docker) или стандартный functools.lru_cache
База данных (PostgreSQL / SQLite / mock)Локальная БД или in-memory SQLite для тестов
Инструмент нагрузочного тестированияlocust, wrk, ab или httpx с asyncio
Мониторинг/логированиеlogging или дашборд (опционально)

Если нет реального инструмента — симулируем:

  1. Redis — поднимите Docker-контейнер:
    docker run -d -p 6379:6379 --name redis-stampede redis:7-alpine
    
  2. БД — используйте SQLite (встроенный модуль) с 1000 записей (например, таблица users) или любой ORM (SQLAlchemy).
  3. Нагрузочный тест — напишите Python-скрипт с asyncio и httpx.AsyncClient, который одновременно шлёт 20+ запросов к одному эндпоинту.
  4. Локальный кэш — если Redis недоступен, используйте dict с TTL и блокировкой asyncio.Lock.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.11+Реализация логики
Веб-фреймворкFastAPI (предпочтительно) или Flask с asyncioREST API
КэшRedis (через redis-py или aioredis) / in-memory dictХранение временных данных
База данныхPostgreSQL / SQLite / mock dictИсточник данных (дорогой запрос)
SingleflightСамописная реализация или библиотека aiolibs / sync/asyncКоординация конкурентных запросов
Нагрузочное тестированиеlocust / httpx / asyncio.gatherСимуляция stampede
Мониторингlogging + счётчики (statsd опционально)Отслеживание числа запросов в БД

4. Этапы выполнения

Этап 1: Подготовка окружения и базовая архитектура (30 мин)

Действия

  1. Создайте новый проект Python: mkdir cache-stampede && cd cache-stampede && python -m venv venv
  2. Установите зависимости: fastapi uvicorn redis httpx aiofiles (или только uvicorn + fastapi + json).
  3. Создайте структуру файлов:
    .
    ├── main.py              # FastAPI приложение
    ├── cache.py             # Абстракция кэша (get/set/delete)
    ├── db.py                # Симуляция БД (дорогой запрос)
    ├── singleflight.py      # Реализация singleflight
    └── test_stampede.py     # Нагрузочный тест
    
  4. В db.py реализуйте функцию get_data_from_db(key: str) -> dict с искусственной задержкой (например, await asyncio.sleep(0.5)) — имитация медленного запроса.
  5. В cache.py реализуйте класс Cache, который поддерживает TTL и методы async def get(key), async def set(key, value, ttl). Используйте Redis или in-memory.

Ожидаемый результат этапа Запускаемый FastAPI-сервер с эндпоинтом GET /data/{key}, который сначала проверяет кэш, при промахе идёт в БД и кладёт результат в кэш (без singleflight).


Этап 2: Воспроизведение stampede (20 мин)

Действия

  1. Напишите нагрузочный тест в test_stampede.py:
import asyncio
import httpx
import time

async def send_request(client, key):
    start = time.perf_counter()
    r = await client.get(f"http://localhost:8000/data/{key}")
    elapsed = time.perf_counter() - start
    return r.status_code, elapsed

async def main():
    key = "popular:1001"
    # дождаться истечения кэша (например, установить TTL=1 секунда)
    await asyncio.sleep(1.2)
    async with httpx.AsyncClient() as client:
        tasks = [send_request(client, key) for _ in range(20)]
        results = await asyncio.gather(*tasks)
    successes = [r for r in results if r[0] == 200]
    print(f"Успешных запросов: {len(successes)}")
    # оценить число обращений к БД по времени ответа (все ли > 0.5с?)

asyncio.run(main())
  1. Запустите сервер (uvicorn main:app --port 8000) и выполните тест.
  2. В логах или счётчиках убедитесь, что для одного ключа было 20 запросов в БД (все параллельно ждут 0.5с каждый → суммарное время ~0.5с, но это 20 вызовов БД).

Ожидаемый результат этапа Подтверждённый stampede: при отсутствии кэша все 20 запросов пошли в БД.


Этап 3: Реализация singleflight (1.5 часа)

Действия

  1. Создайте класс SingleFlight в singleflight.py:
import asyncio
from typing import Awaitable, Callable, Dict, Optional

class SingleFlight:
    def __init__(self):
        self._in_flight: Dict[str, asyncio.Future] = {}

    async def do(self, key: str, func: Callable[[], Awaitable]) -> tuple:
        # 1. Проверить, есть ли уже выполняемая задача для key
        if key in self._in_flight:
            # 2. Если есть — подождать результат этой задачи
            return await asyncio.shield(self._in_flight[key]), False
        # 3. Если нет — создать Future, положить в словарь, выполнить func
        future = asyncio.get_event_loop().create_future()
        self._in_flight[key] = future
        try:
            result = await func()
            future.set_result(result)
            return result, True   # True = этот запрос первый
        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            # 4. Удалить из словаря после завершения
            self._in_flight.pop(key, None)
  1. Интегрируйте SingleFlight в эндпоинт GET /data/{key}:
from fastapi import FastAPI
from cache import Cache
from db import get_data_from_db
from singleflight import SingleFlight

app = FastAPI()
cache = Cache(ttl=1)
flight = SingleFlight()
db_call_counter = 0  # для демонстрации

@app.get("/data/{key}")
async def get_data(key: str):
    global db_call_counter
    # 1. Попытка прочитать из кэша
    cached = await cache.get(key)
    if cached is not None:
        return {"source": "cache", "data": cached}
    # 2. Если промах — передать функцию получения данных в singleflight
    async def fetch_from_db():
        global db_call_counter
        db_call_counter += 1
        data = await get_data_from_db(key)
        await cache.set(key, data, ttl=1)
        return data
    result, was_first = await flight.do(key, fetch_from_db)
    return {"source": "db", "first": was_first, "data": result}
  1. Добавьте эндпоинт /metrics для вывода счётчика обращений к БД.
  2. Перезапустите сервер и повторно выполните нагрузочный тест.
  3. Проверьте счётчик — теперь он должен быть равен 1 (или очень маленькому числу, если у вас не синхронизировано время).

Ожидаемый результат этапа При нагрузке в 20 параллельных запросов к одному ключу только 1 запрос достигает БД; остальные получают данные либо из singleflight (дождались), либо из кэша (если кэш уже записан к моменту их попадания в эндпоинт).


Этап 4: Расширение тестов и граничные случаи (30 мин)

Действия

  1. Добавьте тест на случай ошибки в БД: убедитесь, что singleflight корректно пробрасывает исключение всем ожидающим.
  2. Добавьте тест на разные ключи (популярные и не очень) — проверьте, что singleflight работает изолированно.
  3. Добавьте тест с TTL кэша и истечением — убедитесь, что при повторном запросе после истечения снова срабатывает singleflight.
  4. Напишите unit-тесты на класс SingleFlight (мокая asyncio.sleep):
import pytest
import asyncio
from singleflight import SingleFlight

@pytest.mark.asyncio
async def test_singleflight_blocks_duplicates():
    flight = SingleFlight()
    call_count = 0
    async def slow_func():
        nonlocal call_count
        call_count += 1
        await asyncio.sleep(0.2)
        return "result"
    results = await asyncio.gather(
        flight.do("key", slow_func),
        flight.do("key", slow_func),
        flight.do("key", slow_func),
    )
    assert call_count == 1
    assert results[0] == results[1] == results[2]

Ожидаемый результат этапа Все тесты проходят, singleflight корректно работает при ошибках, истечении кэша и на разных ключах.


Этап 5: Мониторинг и улучшения (20 мин)

Действия

  1. Добавьте логирование каждого вызова singleflight: номер ключа, был ли запрос первым, время ожидания.
  2. Реализуйте таймаут для singleflight (если БД падает — не держать ожидающие запросы бесконечно):
    • Используйте asyncio.wait_for(future, timeout=5.0).
  3. Добавьте метрики Prometheus (опционально): счётчик db_queries_total, гистограмма singleflight_wait_seconds.
  4. Проверьте поведение под настоящей нагрузкой (locust с несколькими пользователями).

Ожидаемый результат этапа Singleflight-защита включена в production-контур с логированием и, при желании, мониторингом.

5. Критерии приемки (Definition of Done)

  • Реализован эндпоинт GET /data/{key}, который возвращает данные из кэша или из БД.
  • При параллельных запросах к одному ключу (20+ одновременно) в БД выполняется ровно 1 запрос.
  • Все ожидающие запросы (не первый) получают актуальные данные (либо дожидаются первого, либо читают из кэша).
  • Кэш имеет TTL; после истечения механизм singleflight снова срабатывает.
  • Реализована обработка ошибок: если функция БД падает, все зависшие запросы получают ту же ошибку.
  • Написаны как минимум 3 unit-теста (на блокировку, на ошибку, на разные ключи).
  • Нагрузочный тест демонстрирует, что число вызовов БД равно 1 (или очень близко к 1).
  • Код покрыт комментариями, а класс SingleFlight имеет docstring.

6. Ожидаемый результат

Основной артефакт Репозиторий с кодом (Python), включающий:

  • main.pyFastAPI приложение с эндпоинтом /data/{key} и /metrics.
  • cache.py — абстракция кэша (Redis или in-memory).
  • db.py — симулятор медленного запроса.
  • singleflight.py — реализация singleflight с асинхронной блокировкой.
  • test_stampede.py — скрипт нагрузочного теста.
  • tests/ — папка с unit-тестами.

Дополнительные результаты (по желанию):

  • Docker-compose для быстрого запуска (Redis + FastAPI).
  • Prometheus-метрики и дашборд Grafana.
  • Документация в README с примером использования и графиком нагрузочного теста.

7. Возможные сложности и их решение

СложностьРешение
Гонка при записи в кэш (первый запрос уже записал кэш, но второй не успел проверить)Использовать атомарные операции Redis (SETNX + TTL) или блокировку на уровне приложения (singleflight уже решает).
Утечка памяти в _in_flight (если ключей очень много)Добавить ограничение по размеру словаря (LRU) или удалять ключи по таймауту.
Future не удаляется при исключении (ошибка в пользовательской функции)Гарантировать finally в do (реализовано выше).
Таймаут ожидания (БД зависла — все запросы ждут бесконечно)Использовать asyncio.wait_for(future, timeout) и удалять future из словаря по таймауту.
Синхронная БД (не asyncio)Обернуть вызов в loop.run_in_executor; singleflight всё равно работает на уровне корутин.
Кэш с коротким TTL и сильной нагрузкойКомбинировать singleflight с "мягким TTL" (stale-while-revalidate) — ещё один паттерн, но выходит за рамки задачи.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка окружения и базовая архитектура30 мин
Этап 2: Воспроизведение stampede20 мин
Этап 3: Реализация singleflight1 ч 30 мин
Этап 4: Расширение тестов и граничные случаи30 мин
Этап 5: Мониторинг и улучшения20 мин
Итого~3 часа 10 мин

Примечание: Для первого раза может потребоваться до 4–5 часов из-за отладки асинхронности и Redis.

9. Связанные вопросы из базы знаний

ВопросТема
1Принципы работы кэширования (Cache-Aside, Write-Through)
2TTL и стратегии инвалидации кэша
3Асинхронное программирование в Python (asyncio, Future)
4Паттерн "Singleflight" (Google Groupcache)
5Эффект лавинного перестроения (Cache Stampede)
6Redis: атомарные операции и блокировки
7Prometheus metrcis и мониторинг производительности
8Unit-тестирование асинхронного кода (pytest-asyncio)
9Работа с Docker (локальный Redis)
10Нагрузочное тестирование (Locust, wrk)

10. Чек-лист самопроверки

  • Я написал нагрузочный тест и убедился, что без singleflight было ≥20 вызовов БД, а после — 1.
  • В singleflight корректно обрабатываются исключения (вызывающая сторона получает ту же ошибку).
  • Я проверил, что singleflight работает изолированно для разных ключей.
  • Кэш имеет TTL, и после его истечения singleflight снова срабатывает.
  • Я написал минимум 3 unit-теста для класса SingleFlight.
  • Код покрыт комментариями на русском или английском (понятно коллеге).
  • В проекте есть README с инструкцией по запуску (как поднять Redis, как запустить тест).
  • Я добавил логирование для каждого вызова singleflight (видно в консоли, кто первый).