Реализовать circuit breaker на вызовы агента

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать circuit breaker на вызовы агента

1. Цель задачи

Разработать и внедрить паттерн Circuit Breaker для асинхронных вызовов между агентами в распределённой AI-системе. При превышении доли ошибок (>50%) цепь размыкается, предотвращая каскадные отказы и давая время на восстановление зависимых сервисов.

Ключевой результат Работающий механизм, который зафиксирован в коде, покрыт тестами и демонстрирует отсутствие каскадных отказов при искусственной эмуляции сбоев.


2. Исходные данные

Что нужноОткуда взять
Система с несколькими агентами (2+), общающимися по HTTP/gRPCПет-проект (например, multi-agent на FastAPI + asyncio) или готовый прототип
Сценарий вызова (синхронный или асинхронный) с возможностью эмулировать ошибкиСобственный код или тестовый стенд
Инструмент для эмуляции сбоев (chaos)chaostoolkit, toxiproxy, или простая заглушка с вероятностью отказа
Метрики: количество запросов, успешные/ошибочные, latencyPrometheus + Grafana (или In-memory для MVP)

Если нет реальной multi-agent системы — симулируем:

  1. Создать два микросервиса на FastAPI: AgentA (клиент) и AgentB (сервер).
  2. В AgentB добавить эндпоинт /process, который с вероятностью 60% возвращает 500 ошибку.
  3. В AgentA реализовать вызов AgentB через httpx.AsyncClient.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.11+Реализация логики circuit breaker
Фреймворк для агентовFastAPI + asyncio / aiohttpАсинхронные вызовы
Реализация circuit breakerСамописная или pybreaker + адаптацияУправление состояниями CLOSED / OPEN / HALF_OPEN
Тестированиеpytest, pytest-asyncio, unittest.mockUnit и интеграционные тесты
Эмуляция сбоевchaostoolkit или toxiproxyКонтролируемое внесение ошибок
Метрики и логиPrometheus client, structlog / loggingСбор статистики
Оркестрация тестовDocker Compose (опционально)Подъём нескольких агентов

4. Этапы выполнения

Этап 1: Проектирование и подготовка структуры (1 час)

Действия

  1. Определить интерфейс вызова агента (асинхронная функция, принимающая URL и возвращающая ответ).
  2. Спроектировать класс CircuitBreaker, хранящий:
    • состояние (CLOSED, OPEN, HALF_OPEN)
    • счётчики успехов/ошибок
    • порог ошибок (доля >0.5)
    • время восстановления (timeout для перехода из OPEN в HALF_OPEN)
    • количество запросов в окне (скользящее окно)
  3. Составить диаграмму состояний (можно текстом).
    Ожидаемый результат этапа Файл design.md с описанием API и состояний.

Этап 2: Реализация core-логики (3 часа)

Действия

  1. Реализовать класс CircuitBreaker в circuit_breaker.py:
    • Метод __call__(self, func, *args, **kwargs) — обёртка над вызовом.
    • Внутренние счётчики: _success_counter, _failure_counter, _total_calls.
    • Проверка доли ошибок при каждом вызове; если >50% и вызовов >= min_requests (например, 10) → перейти в OPEN.
    • В состоянии OPEN: немедленный вызов callback (без запроса к агенту), после timeout → HALF_OPEN.
    • В HALF_OPEN: пропустить один запрос; если он успешен → CLOSED, если ошибка → OPEN.
  2. Добавить потокобезопасность через asyncio.Lock.
  3. Экспортировать декоратор @circuit_breaker для обёртки функций вызовов.
# Пример структуры
import asyncio
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = 1
    OPEN = 2
    HALF_OPEN = 3

class CircuitBreaker:
    def __init__(self, failure_threshold=0.5, recovery_timeout=30, min_requests=10):
        ...
    
    async def __call__(self, func, *args, **kwargs):
        ...

Ожидаемый результат этапа Рабочий класс, покрытый unit-тестами (pytest-asyncio) для каждого состояния и переходов.

Этап 3: Интеграция с вызовом агента (1.5 часа)

Действия

  1. Создать два FastAPI-приложения: agent_a (содержит клиент) и agent_b (содержит эндпоинт).
  2. В agent_a обернуть вызов к agent_b через экземпляр CircuitBreaker.
  3. Настроить middleware для сбора метрик (через Prometheus client):
    • circuit_breaker_state{gauge} — текущее состояние (0/1/2)
    • circuit_breaker_calls_total{counter, state, result} — количество вызовов
    • circuit_breaker_open_duration_seconds — время в OPEN
  4. Поднять оба сервиса, проверить базовое взаимодействие (без сбоев).

Ожидаемый результат этапа Клиент-серверная связка, метрики экспортируются на /metrics.

Этап 4: Тестирование с эмуляцией сбоев (2 часа)

Действия

  1. Запустить AgentB с вероятностью ошибки 60% (через переменную окружения FAIL_RATE).
  2. Направить 100 запросов от AgentA к AgentB.
  3. Проверить:
    • После ~10-20 запросов цепь переходит в OPEN (доля ошибок >50%).
    • Последующие вызовы не доходят до AgentB (счётчик ошибок не растёт).
    • Через recovery_timeout цепь переходит в HALF_OPEN, один запрос доходит до сервера.
    • Если сервер всё ещё ошибается → снова OPEN; если исправился → CLOSED.
  4. Написать автоматический тест на toxiproxy или с помощью unittest.mock, имитируя временные сбои.

Ожидаемый результат этапа Скрипт chaos_test.py, который подтверждает, что каскадных отказов нет.

Этап 5: Документация и финальная проверка (1 час)

Действия

  1. Написать README с описанием архитектуры, инструкцией по запуску.
  2. Оформить code review: проверить обработку крайних случаев (0 вызовов, переполнение счётчиков, конкурентные запросы).
  3. Убедиться, что метрики доступны; добавить алерт (опционально) на circuit_breaker_state != CLOSED > 5 мин.

Ожидаемый результат этапа Репозиторий с полным кодом, тестами и документацией.


5. Критерии приемки (Definition of Done)

  • Паттерн реализован как отдельный модуль circuit_breaker.py.
  • Реализованы три состояния: CLOSED, OPEN, HALF_OPEN с корректными переходами.
  • Порог ошибок конфигурируется и по умолчанию равен 50%.
  • В состоянии OPEN запросы мгновенно отклоняются (без вызова внешнего сервиса).
  • В состоянии HALF_OPEN пропускается ровно один запрос.
  • Счётчики ошибок успеваемости сбрасываются после перехода в CLOSED.
  • Все асинхронные вызовы потокобезопасны (используется asyncio.Lock).
  • Написаны unit-тесты, покрывающие основные сценарии (не менее 80% строк).
  • Интеграционный тест симулирует >50% ошибок и проверяет отсутствие каскада.
  • Метрики (состояние, количество вызовов) доступны через HTTP-эндпоинт /metrics.

6. Ожидаемый результат

Основной артефакт git-репозиторий, содержащий:

  • circuit_breaker.py — реализация класса.
  • agent_a/main.py — клиент с circuit breaker.
  • agent_b/main.py — сервер с эндпоинтом /process.
  • tests/ — unit-тесты (pytest-asyncio) и интеграционный тест (test_chaos.py).
  • docker-compose.yml (опционально) — для поднятия обоих агентов.
  • README.md — описание, как запустить и проверить.

Дополнительно (опционально):

  • График метрик из Prometheus (скриншот) или дашборд в Grafana.
  • Скрипт для эмуляции сбоев (chaos.sh).

7. Возможные сложности и их решение

СложностьРешение
Гонка состояний при конкурентных запросахИспользовать asyncio.Lock для всех изменений состояния; тестировать с asyncio.gather.
Выбор размера скользящего окнаСделать окно по количеству запросов (фиксированный буфер), а не временное окно — проще с асинхронностью.
Неверное определение доли ошибок при малом числе запросовВвести min_requests — минимальное количество вызовов для принятия решения (по умолчанию 10).
Сервис восстанавливается, но circuit breaker долго ждётНастроить recovery_timeout разумно (30–60 секунд) и дать возможность ручного сброса через API.
Отсутствие метрикПодключить prometheus_client и обновлять gauge/counter внутри методов on_success, on_failure.

8. Бюджет времени (оценка)

ЭтапВремя (часы)
Проектирование1
Реализация core-логики3
Интеграция с агентами1.5
Тестирование с эмуляцией сбоев2
Документация и финальная проверка1
Итого8.5

Примечание Для первого раза рекомендуется заложить +20% буфера (≈10 часов) на отладку и неожиданные проблемы с асинхронностью.


9. Связанные вопросы из базы знаний

ВопросТема
42Паттерн Circuit Breaker: состояния и переходы
87Асинхронные вызовы между микросервисами (aiohttp / httpx)
123Prometheus метрики для resilience паттернов
204Тестирование распределённых систем: моки и chaostoolkit
315Обработка ошибок в asyncio: таймауты, retry, circuit breaker
456Проектирование межсервисного взаимодействия: графы агентов
567Мониторинг каскадных отказов в AI-системах
678Алгоритмы скользящего окна для подсчёта ошибок
789Практика HALF_OPEN: как безопасно проверять восстановление
891Интеграция circuit breaker с существующими HTTP-клиентами

10. Чек-лист самопроверки

  • Я реализовал все три состояния с корректными переходами.
  • Я проверил, что при >50% ошибок цепь размыкается и вызовы не доходят до зависимого сервиса.
  • Я написал минимум три unit-теста для каждого состояния (CLOSED, OPEN, HALF_OPEN).
  • Я протестировал конкурентные вызовы через asyncio.gather и убедился в отсутствии гонок.
  • Я проверил, что после восстановления сервиса цепь закрывается автоматически.
  • Я убедился, что метрики экспортируются и отражают реальное состояние.