Настройка orchestration с помощью Temporal для 5 агентов с компенсацией
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Настройка orchestration с помощью Temporal для 5 агентов с компенсацией
1. Цель задачи
Разработать отказоустойчивый workflow на платформе Temporal, координирующий работу пяти агентов и реализующий механизм compensation (отката) при сбоях любого из шагов. Ключевой результат при возникновении ошибки в любом агенте система выполняет компенсацию всех ранее завершённых агентов в обратном порядке, не оставляя систему в неконсистентном состоянии.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Спецификация Temporal Workflow и Activity | Документация Temporal (раздел Developer's guide / Python SDK) |
| Примеры Saga-паттерна с компенсацией | Temporal Saga Example |
| Локальный Temporal Server | Temporalite (single‑binary dev server) или Docker Compose |
| Python SDK для Temporal | pip install temporalio |
| Реализация агентов (заглушки) | Создаётся самостоятельно на основе описания в этапах |
Если нет реального инструмента — симулируем:
- Загрузить и запустить Temporalite: temporalite start --namespace default.
- Либо создать docker-compose.yml с сервисами Temporal (см. официальный репозиторий) и выполнить docker compose up -d.
- Установить Temporal CLI (tctl) для наблюдения за workflow.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Temporal Server | Temporalite / docker‑compose (Temporal Server v1.21+) | Выполнение и оркестрация workflow |
| Клиентский SDK | Python 3.10+, temporalio v1.7+ | Разработка workflow и активностей |
| Контейнеризация | Docker (опционально) | Изоляция сервера и worker'ов |
| Мониторинг | tctl, Temporal Web UI (порт 8233) | Визуализация выполнения, проверка rollback |
| Версионирование кода | Git | Фиксация изменений |
4. Этапы выполнения
Этап 1: Подготовка окружения и Temporal Server (30 минут)
Действия
- Установить Python 3.10+ и создать виртуальное окружение:
python -m venv venv && source venv/bin/activate - Установить SDK: pip install temporalio
- Скачать и запустить Temporalite (или поднять Docker Compose):
# Temporalite (Linux/Mac) curl -sSf https://github.com/temporalio/temporalite/releases/latest/download/temporalite_linux_amd64.tar.gz | tar -xz ./temporalite start --namespace default - Убедиться, что сервер работает: tctl cluster health или открыть http://localhost:8233
- Создать директорию проекта:
agents_orchestrator/с файлами worker.py,workflows.py,activities.py.
Ожидаемый результат этапа
Локальный Temporal Server запущен и отвечает, проект инициализирован.
Этап 2: Реализация активностей для пяти агентов (1 час)
Действия
- В
activities.pyсоздать классAcivities(декораторы @activity.defn). - Определить пять активностей, каждая имитирует уникальное действие с возможностью отката:
agent1_reserve_resource– резервирует ресурс (например, запись в словарь). Реализация:resource = db.reserve(). Compensation:db.release().agent2_send_notification– отправляет уведомление. Compensation: отменить уведомление (запись в лог «compensation: уведомление отменено»).agent3_update_database– изменяет запись в БД (in‑memory dict). Compensation: откатить изменение.agent4_call_external_api– имитирует вызов внешнего API (ожидание 1 с). Compensation: отправить запрос отмены (лог).agent5_log_completion– логирует успех. Compensation: удалить запись из лога (или пометить ошибкой).
- Для каждого агента добавить метод‑компенсацию с той же сигнатурой (или отдельную activity для компенсации).
- Зарегистрировать активности в worker (см. этап 4).
- Написать простые assert‑тесты (например, что вызов
agent1_reserve_resourceвозвращает идентификатор).
Пример кода (activities.py):
from temporalio import activity
@activity.defn(name="agent1_reserve")
async def agent1_reserve_resource(order_id: str) -> str:
reservation_id = f"res-{order_id}"
# симуляция записи
activity.logger.info(f"Resource reserved: {reservation_id}")
return reservation_id
@activity.defn(name="agent1_compensate_reserve")
async def compensate_reserve(reservation_id: str) -> None:
activity.logger.info(f"Compensation: release {reservation_id}")
Ожидаемый результат этапа
Все пять пар активностей (основная + компенсация) реализованы и протестированы изолированно.
Этап 3: Разработка Workflow с компенсацией (1.5 часа)
Действия
- В
workflows.pyобъявить классAgentWorkflowс декоратором @workflow.defn. - В методе async def run(self, order_id: str) -> str:
- Реализовать последовательный вызов активностей агентов через workflow.execute_activity(...).
- Использовать
try/exceptдля перехвата ошибок. - При ошибке выполнить компенсации в обратном порядке для уже успешных шагов.
- Для изоляции компенсаций от основной ошибки использовать собственный try/except с игнорированием ошибок компенсации (чтобы не потерять первичную причину).
- Использовать паттерн Saga: внутри
exceptвызывать asyncio.gather(*[compensate_activity(...) for a in reversed(success_steps)]). - После успешного выполнения всех шагов вернуть строку
"SUCCESS: Order {order_id} processed". - Обработать случай, когда компенсация тоже падает – логировать и продолжать (чтобы не блокировать остальные компенсации).
Пример каркаса workflow
from temporalio import workflow
from activities import agent1_reserve_resource, compensate_reserve, ...
@workflow.defn
class AgentWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
steps = [
("agent1", agent1_reserve_resource, compensate_reserve),
("agent2", agent2_send_notification, compensate_notification),
("agent3", agent3_update_database, compensate_db),
("agent4", agent4_call_external_api, compensate_api),
("agent5", agent5_log_completion, compensate_log)
]
executed = []
for name, action, compensate in steps:
try:
result = await workflow.execute_activity(action, order_id, start_to_close_timeout=10)
executed.append((name, result, compensate))
except Exception as e:
# Rollback в обратном порядке
for _, res, comp in reversed(executed):
try:
await workflow.execute_activity(comp, res, start_to_close_timeout=5)
except Exception as ce:
workflow.logger.error(f"Compensation for {name} failed: {ce}")
raise # или вернуть специальный результат
return "SUCCESS: Order {order_id} processed"
Ожидаемый результат этапа
Workflow собран, компилируется, содержит логику отката.
Этап 4: Запуск worker и тестирование rollback (1 час)
Действия
- В
worker.pyсоздать асинхронный раннер, который запускает worker с одним Task Queueagent-queue. - Запустить worker:
python worker.py. - В отдельном терминале запустить Python скрипт
client.py, который инициирует workflow с параметромorder_id=TEST. - Для тестирования rollback модифицировать один из агентов (например, агент 3) так, чтобы он выбрасывал исключение после успешного завершения первых двух.
- Запустить workflow заново и проверить через Temporal Web UI или tctl:
- Написать юнит‑тест (с использованием
temporalio.testing), проверяющий корректную последовательность компенсаций.
Ожидаемый результат этапа
При ошибке агента 3 workflow прерывается, а для агентов 1 и 2 корректно вызываются компенсации. Состояние системы (in‑memory dict) возвращается в исходное.
Этап 5: Документация и финализация (30 минут)
Действия
- Написать
README.mdс описанием архитектуры, инструкцией по запуску и проверке rollback. - Добавить
requirements.txtс версиями зависимостей. - Оформить код в соответствии с PEP8, добавить docstring.
- Создать
docker-compose.yml(опционально) для полного окружения, если ранее использовался Temporalite. - Закоммитить всё в репозиторий.
Ожидаемый результат этапа
Полный проект с документацией, воспроизводимый на любой машине.
5. Критерии приемки (Definition of Done)
- Workflow содержит ровно 5 агентов, каждый из которых имеет основную activity и activity компенсации.
- При возникновении исключения в любом агенте (кроме последнего) выполняются компенсации всех ранее успешно завершённых агентов в обратном порядке.
- Компенсации не выбрасывают необработанных исключений (catch внутри workflow).
- Можно запустить локально Temporal Server (через Temporalite или Docker), worker и клиент.
- В Temporal Web UI отображаются выполненные шаги и ошибка с корректной цепочкой.
- Код имеет хотя бы один автоматизированный тест (unit‑test или integration), проверяющий rollback.
6. Ожидаемый результат
- Основной артефакт директория
agents_orchestrator/с файлами: - Содержимое полностью рабочий код, при запуске которого:
- Дополнительно (опционально):
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Компенсация может сама упасть (сбой сети, неидемпотентность) | Использовать try/catch внутри компенсации и логировать ошибку. Продолжить выполнение остальных компенсаций. |
| Компенсации должны быть идемпотентными | Разрабатывать компенсации так, чтобы повторный вызов не приводил к некорректному состоянию (например, удаление уже удалённого ресурса – безопасно). |
| Трудность с отладкой длинных workflow | Использовать Temporal Web UI для просмотра каждого шага; добавить детальное логирование в активности. |
| Порядок компенсаций при вложенных workflow | Для простоты использовать линейный сценарий; при необходимости расширить до дерева компенсаций через Saga. |
| Конфликт версий Temporal SDK | Использовать версии, указанные в requirements.txt, и проверять совместимость с запущенным сервером. |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Подготовка окружения | 30 мин |
| Этап 2: Реализация активностей | 60 мин |
| Этап 3: Разработка workflow | 90 мин |
| Этап 4: Запуск и тестирование rollback | 60 мин |
| Этап 5: Документация и финализация | 30 мин |
| Итого | 4 ч 30 мин |
Примечание для первого раза Время может увеличиться на 1–2 часа из-за первого знакомства с Temporal, особенно с его концепциями (Task Queue, Workers, Activity heartbeats).
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 12 | Saga‑паттерн и распределённые транзакции |
| 45 | Compensation в микросервисной архитектуре |
| 78 | Temporal SDK для Python |
| 103 | Orchestration vs Choreography |
| 155 | Механизмы retry и timeout в workflow |
| 221 | Обработка ошибок при координации агентов |
| 290 | Проектирование multi‑agent системы для отказоустойчивости |
| 401 | Тестирование распределённых workflow |
| 510 | Идемпотентность в компенсациях |
| 678 | Локальная среда Temporal (Temporalite, Docker) |
10. Чек-лист самопроверки
- Я запустил Temporal Server и убедился, что он доступен.
- Я реализовал все 10 активностей и зарегистрировал их в worker.
- Мой workflow использует
try/exceptи вызывает компенсации в обратном порядке. - Я проверил сценарий, в котором агент 3 падает, и убедился, что компенсации для агентов 1 и 2 выполнены.
- Я написал тест (хотя бы ручной сценарий) и зафиксировал результат.
- Документация содержит шаги для воспроизведения на чистой машине.