Настройка orchestration с помощью Temporal для 5 агентов с компенсацией

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настройка orchestration с помощью Temporal для 5 агентов с компенсацией

1. Цель задачи

Разработать отказоустойчивый workflow на платформе Temporal, координирующий работу пяти агентов и реализующий механизм compensation (отката) при сбоях любого из шагов. Ключевой результат при возникновении ошибки в любом агенте система выполняет компенсацию всех ранее завершённых агентов в обратном порядке, не оставляя систему в неконсистентном состоянии.

2. Исходные данные

Что нужноОткуда взять
Спецификация Temporal Workflow и ActivityДокументация Temporal (раздел Developer's guide / Python SDK)
Примеры Saga-паттерна с компенсациейTemporal Saga Example
Локальный Temporal ServerTemporalite (single‑binary dev server) или Docker Compose
Python SDK для Temporalpip install temporalio
Реализация агентов (заглушки)Создаётся самостоятельно на основе описания в этапах

Если нет реального инструмента — симулируем:

  1. Загрузить и запустить Temporalite: temporalite start --namespace default.
  2. Либо создать docker-compose.yml с сервисами Temporal (см. официальный репозиторий) и выполнить docker compose up -d.
  3. Установить Temporal CLI (tctl) для наблюдения за workflow.

3. Технологический стек

КомпонентИнструментыНазначение
Temporal ServerTemporalite / docker‑compose (Temporal Server v1.21+)Выполнение и оркестрация workflow
Клиентский SDKPython 3.10+, temporalio v1.7+Разработка workflow и активностей
КонтейнеризацияDocker (опционально)Изоляция сервера и worker'ов
Мониторингtctl, Temporal Web UI (порт 8233)Визуализация выполнения, проверка rollback
Версионирование кодаGitФиксация изменений

4. Этапы выполнения

Этап 1: Подготовка окружения и Temporal Server (30 минут)

Действия

  1. Установить Python 3.10+ и создать виртуальное окружение:
    python -m venv venv && source venv/bin/activate
  2. Установить SDK: pip install temporalio
  3. Скачать и запустить Temporalite (или поднять Docker Compose):
    # Temporalite (Linux/Mac)
    curl -sSf https://github.com/temporalio/temporalite/releases/latest/download/temporalite_linux_amd64.tar.gz | tar -xz
    ./temporalite start --namespace default
    
  4. Убедиться, что сервер работает: tctl cluster health или открыть http://localhost:8233
  5. Создать директорию проекта: agents_orchestrator/ с файлами worker.py, workflows.py, activities.py.

Ожидаемый результат этапа
Локальный Temporal Server запущен и отвечает, проект инициализирован.

Этап 2: Реализация активностей для пяти агентов (1 час)

Действия

  1. В activities.py создать класс Acivities (декораторы @activity.defn).
  2. Определить пять активностей, каждая имитирует уникальное действие с возможностью отката:
    • agent1_reserve_resource – резервирует ресурс (например, запись в словарь). Реализация: resource = db.reserve(). Compensation: db.release().
    • agent2_send_notification – отправляет уведомление. Compensation: отменить уведомление (запись в лог «compensation: уведомление отменено»).
    • agent3_update_database – изменяет запись в БД (in‑memory dict). Compensation: откатить изменение.
    • agent4_call_external_api – имитирует вызов внешнего API (ожидание 1 с). Compensation: отправить запрос отмены (лог).
    • agent5_log_completion – логирует успех. Compensation: удалить запись из лога (или пометить ошибкой).
  3. Для каждого агента добавить метод‑компенсацию с той же сигнатурой (или отдельную activity для компенсации).
  4. Зарегистрировать активности в worker (см. этап 4).
  5. Написать простые assert‑тесты (например, что вызов agent1_reserve_resource возвращает идентификатор).

Пример кода (activities.py):

from temporalio import activity

@activity.defn(name="agent1_reserve")
async def agent1_reserve_resource(order_id: str) -> str:
    reservation_id = f"res-{order_id}"
    # симуляция записи
    activity.logger.info(f"Resource reserved: {reservation_id}")
    return reservation_id

@activity.defn(name="agent1_compensate_reserve")
async def compensate_reserve(reservation_id: str) -> None:
    activity.logger.info(f"Compensation: release {reservation_id}")

Ожидаемый результат этапа
Все пять пар активностей (основная + компенсация) реализованы и протестированы изолированно.

Этап 3: Разработка Workflow с компенсацией (1.5 часа)

Действия

  1. В workflows.py объявить класс AgentWorkflow с декоратором @workflow.defn.
  2. В методе async def run(self, order_id: str) -> str:
    • Реализовать последовательный вызов активностей агентов через workflow.execute_activity(...).
    • Использовать try/except для перехвата ошибок.
    • При ошибке выполнить компенсации в обратном порядке для уже успешных шагов.
    • Для изоляции компенсаций от основной ошибки использовать собственный try/except с игнорированием ошибок компенсации (чтобы не потерять первичную причину).
  3. Использовать паттерн Saga: внутри except вызывать asyncio.gather(*[compensate_activity(...) for a in reversed(success_steps)]).
  4. После успешного выполнения всех шагов вернуть строку "SUCCESS: Order {order_id} processed".
  5. Обработать случай, когда компенсация тоже падает – логировать и продолжать (чтобы не блокировать остальные компенсации).

Пример каркаса workflow

from temporalio import workflow
from activities import agent1_reserve_resource, compensate_reserve, ...

@workflow.defn
class AgentWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        steps = [
            ("agent1", agent1_reserve_resource, compensate_reserve),
            ("agent2", agent2_send_notification, compensate_notification),
            ("agent3", agent3_update_database, compensate_db),
            ("agent4", agent4_call_external_api, compensate_api),
            ("agent5", agent5_log_completion, compensate_log)
        ]
        executed = []
        for name, action, compensate in steps:
            try:
                result = await workflow.execute_activity(action, order_id, start_to_close_timeout=10)
                executed.append((name, result, compensate))
            except Exception as e:
                # Rollback в обратном порядке
                for _, res, comp in reversed(executed):
                    try:
                        await workflow.execute_activity(comp, res, start_to_close_timeout=5)
                    except Exception as ce:
                        workflow.logger.error(f"Compensation for {name} failed: {ce}")
                raise # или вернуть специальный результат
        return "SUCCESS: Order {order_id} processed"

Ожидаемый результат этапа
Workflow собран, компилируется, содержит логику отката.

Этап 4: Запуск worker и тестирование rollback (1 час)

Действия

  1. В worker.py создать асинхронный раннер, который запускает worker с одним Task Queue agent-queue.
  2. Запустить worker: python worker.py.
  3. В отдельном терминале запустить Python скрипт client.py, который инициирует workflow с параметром order_id=TEST.
  4. Для тестирования rollback модифицировать один из агентов (например, агент 3) так, чтобы он выбрасывал исключение после успешного завершения первых двух.
  5. Запустить workflow заново и проверить через Temporal Web UI или tctl:
    • tctl workflow observe --workflow-id <id> – увидеть, что workflow завершился с ошибкой, а компенсации выполнены.
    • В логах worker увидеть строчки компенсации.
  6. Написать юнит‑тест (с использованием temporalio.testing), проверяющий корректную последовательность компенсаций.

Ожидаемый результат этапа
При ошибке агента 3 workflow прерывается, а для агентов 1 и 2 корректно вызываются компенсации. Состояние системы (in‑memory dict) возвращается в исходное.

Этап 5: Документация и финализация (30 минут)

Действия

  1. Написать README.md с описанием архитектуры, инструкцией по запуску и проверке rollback.
  2. Добавить requirements.txt с версиями зависимостей.
  3. Оформить код в соответствии с PEP8, добавить docstring.
  4. Создать docker-compose.yml (опционально) для полного окружения, если ранее использовался Temporalite.
  5. Закоммитить всё в репозиторий.

Ожидаемый результат этапа
Полный проект с документацией, воспроизводимый на любой машине.

5. Критерии приемки (Definition of Done)

  • Workflow содержит ровно 5 агентов, каждый из которых имеет основную activity и activity компенсации.
  • При возникновении исключения в любом агенте (кроме последнего) выполняются компенсации всех ранее успешно завершённых агентов в обратном порядке.
  • Компенсации не выбрасывают необработанных исключений (catch внутри workflow).
  • Можно запустить локально Temporal Server (через Temporalite или Docker), worker и клиент.
  • В Temporal Web UI отображаются выполненные шаги и ошибка с корректной цепочкой.
  • Код имеет хотя бы один автоматизированный тест (unit‑test или integration), проверяющий rollback.

6. Ожидаемый результат

  • Основной артефакт директория agents_orchestrator/ с файлами:
    • workflows.py — код workflow (class AgentWorkflow)
    • activities.py — 10 activities (5 основных + 5 compensation)
    • worker.py — раннер worker
    • client.py — инициализатор workflow
    • requirements.txt
    • README.md с инструкцией
  • Содержимое полностью рабочий код, при запуске которого:
    • Без ошибок: workflow завершается успешно, все 5 агентов выполнены.
    • С ошибкой: workflow завершается ошибкой, но все компенсации записаны в лог, и состояние не нарушено.
  • Дополнительно (опционально):
    • Docker‑compose для сервиса Temporal
    • Юнит‑тест с использованием temporalio.testing (опционально)

7. Возможные сложности и их решение

СложностьРешение
Компенсация может сама упасть (сбой сети, неидемпотентность)Использовать try/catch внутри компенсации и логировать ошибку. Продолжить выполнение остальных компенсаций.
Компенсации должны быть идемпотентнымиРазрабатывать компенсации так, чтобы повторный вызов не приводил к некорректному состоянию (например, удаление уже удалённого ресурса – безопасно).
Трудность с отладкой длинных workflowИспользовать Temporal Web UI для просмотра каждого шага; добавить детальное логирование в активности.
Порядок компенсаций при вложенных workflowДля простоты использовать линейный сценарий; при необходимости расширить до дерева компенсаций через Saga.
Конфликт версий Temporal SDKИспользовать версии, указанные в requirements.txt, и проверять совместимость с запущенным сервером.

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Подготовка окружения30 мин
Этап 2: Реализация активностей60 мин
Этап 3: Разработка workflow90 мин
Этап 4: Запуск и тестирование rollback60 мин
Этап 5: Документация и финализация30 мин
Итого4 ч 30 мин

Примечание для первого раза Время может увеличиться на 1–2 часа из-за первого знакомства с Temporal, особенно с его концепциями (Task Queue, Workers, Activity heartbeats).

9. Связанные вопросы из базы знаний

ВопросТема
12Saga‑паттерн и распределённые транзакции
45Compensation в микросервисной архитектуре
78Temporal SDK для Python
103Orchestration vs Choreography
155Механизмы retry и timeout в workflow
221Обработка ошибок при координации агентов
290Проектирование multi‑agent системы для отказоустойчивости
401Тестирование распределённых workflow
510Идемпотентность в компенсациях
678Локальная среда Temporal (Temporalite, Docker)

10. Чек-лист самопроверки

  • Я запустил Temporal Server и убедился, что он доступен.
  • Я реализовал все 10 активностей и зарегистрировал их в worker.
  • Мой workflow использует try/except и вызывает компенсации в обратном порядке.
  • Я проверил сценарий, в котором агент 3 падает, и убедился, что компенсации для агентов 1 и 2 выполнены.
  • Я написал тест (хотя бы ручной сценарий) и зафиксировал результат.
  • Документация содержит шаги для воспроизведения на чистой машине.