Реализовать иерархическое делегирование
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать иерархическое делегирование
1. Цель задачи
Научиться строить multi-agent систему с иерархической структурой, где один Supervisor агент декомпозирует поступившую задачу на подзадачи и делегирует их двум Worker агентам. Главный фокус — на механизме декомпозиции, передачи контекста и координации, чтобы вся задача выполнялась не более чем за 5 шагов (итераций общения Supervisor‑Worker). Вы получите готовый шаблон для масштабирования на большее число воркеров и типов задач.
Ключевой результат Работающий код multi-agent системы, которая за <5 шагов решает задачу (например, генерация сводки по продажам или ответ на комплексный вопрос), логирующая каждый шаг делегирования.
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Базовое понимание LangGraph / CrewAI (или другого фреймворка) | Документация, примеры из курса |
| LLM API ключ (OpenAI, Anthropic или локальная модель) | OpenAI / Ollama / Anthropic |
| Python 3.10+ с установленными зависимостями | pip install langgraph langchain-openai crewai (или аналоги) |
| Тестовая задача | Описана ниже в разделе «Если нет реального инструмента» |
Если нет реального инструмента — симулируем:
- Создайте две подзадачи, которые Worker’ы будут выполнять независимо (например, «Собрать данные о продажах за май» и «Рассчитать средний чек»).
- Напишите mock‑воркеров, которые вместо реального вызова LLM возвращают заранее подготовленные ответы — это позволит отладить логику делегирования без траты токенов.
- Используйте простой текстовый формат для передачи контекста (JSON или Markdown).
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Фреймворк агентов | LangGraph (рекомендуется) / CrewAI / AutoGen | Построение графа состояний, узлы Supervisor и Worker |
| LLM | OpenAI GPT‑4o / Claude 3.5 / Ollama (local) | Принятие решений, генерация ответов |
| Язык | Python 3.10+ | Реализация логики агентов |
| Конфигурация | YAML / .env | Хранение API‑ключей, параметров |
| Логирование | Python logging + JSON‑логи | Запись каждого шага делегирования |
| Тестирование | pytest, unittest | Проверка limit шагов и корректности результата |
4. Этапы выполнения
Этап 1: Проектирование архитектуры «Supervisor – 2 Worker» (30 минут)
Действия
-
Определите роли
-
Спроектируйте граф состояний (StateGraph):
- Узлы:
start → supervisor_plan → worker_A → worker_B → supervisor_aggregate → end - Допускается параллельное выполнение Worker’ов (если фреймворк поддерживает) или последовательное.
- Узлы:
-
Определите формат состояния (State):
from typing import TypedDict, List, Optional from langgraph.graph import StateGraph, END class AgentState(TypedDict): task: str plan: Optional[str] # план от supervisor worker_a_result: Optional[str] worker_b_result: Optional[str] final_answer: Optional[str] step_count: int max_steps: int = 5 logs: List[str]
Ожидаемый результат этапа Схема графа (можно текстом) и заготовка класса состояния.
Этап 2: Реализация Supervisor (1 час)
Действия
-
Напишите функцию supervisor_plan(state) через LLM или rule‑based:
def supervisor_plan(state: AgentState) -> AgentState: # Вызываем LLM с задачей prompt = f"Разбей задачу '{state['task']}' на 2 подзадачи: одна для сбора данных, вторая для анализа. Ответь JSON: {{'worker_a':..., 'worker_b':...}}" response = llm.invoke(prompt) plan = json.loads(response.content) state['plan'] = plan state['step_count'] += 1 state['logs'].append(f"Шаг {state['step_count']}: план составлен") return state -
Добавьте проверку step_count < max_steps — если превышение, переходить к
supervisor_aggregateс сообщением об ошибке. -
Протестируйте на простой задаче
Пример задачи: «Посчитать суммарную выручку по магазину #7 за апрель и сравнить с апрелем прошлого года».
Ожидаемый результат этапа Узел Supervisor в графе, который возвращает план в виде словаря.
Этап 3: Реализация Worker’ов (1 час)
Действия
-
Worker A — «сбор данных»:
def worker_a(state: AgentState) -> AgentState: sub_task = state['plan']['worker_a'] # симуляция: получаем данные из переменной окружения или локального файла state['worker_a_result'] = fetch_data(sub_task) state['step_count'] += 1 state['logs'].append(f"Шаг {state['step_count']}: Worker A выполнил") return state -
Worker B — «анализ»:
def worker_b(state: AgentState) -> AgentState: sub_task = state['plan']['worker_b'] state['worker_b_result'] = analyze_data(sub_task, state.get('worker_a_result','')) state['step_count'] += 1 state['logs'].append(f"Шаг {state['step_count']}: Worker B выполнил") return state -
Добавьте fallback — если Worker возвращает ошибку, Supervisor должен перепланировать.
Ожидаемый результат этапа Два рабочих узла, готовых к интеграции.
Этап 4: Интеграция и финальный узел Supervisor‑aggregate (30 минут)
Действия
-
Напишите supervisor_aggregate(state):
def supervisor_aggregate(state: AgentState) -> AgentState: if state['worker_a_result'] and state['worker_b_result']: final = combine_results(state['worker_a_result'], state['worker_b_result']) else: final = "Ошибка: не все подзадачи выполнены" state['final_answer'] = final state['step_count'] += 1 state['logs'].append(f"Шаг {state['step_count']}: финальный ответ сформирован") return state -
Постройте граф:
graph = StateGraph(AgentState) graph.add_node("supervisor_plan", supervisor_plan) graph.add_node("worker_a", worker_a) graph.add_node("worker_b", worker_b) graph.add_node("supervisor_aggregate", supervisor_aggregate) graph.set_entry_point("supervisor_plan") graph.add_conditional_edges( "supervisor_plan", lambda state: "worker_a" if state['step_count'] < state['max_steps'] else "supervisor_aggregate" ) graph.add_edge("worker_a", "worker_b") graph.add_edge("worker_b", "supervisor_aggregate") graph.add_edge("supervisor_aggregate", END)
Ожидаемый результат этапа Полностью скомпилированный граф, готовый к запуску.
Этап 5: Тестирование и оптимизация (45 минут)
Действия
-
Напишите тест, проверяющий, что задача решается за <5 шагов:
def test_delegation_limit(): initial_state = AgentState(task="Задача:..., max_steps=5) result = graph.invoke(initial_state) assert result['step_count'] < 5, f"Шагов: {result['step_count']}" assert result['final_answer'] is not None -
Проверьте логирование — каждый шаг должен быть записан в logs и выведен в stdout.
-
Эксперимент измените max_steps на 3 и убедитесь, что система справляется (возможно, с урезанным планом).
Ожидаемый результат этапа Тесты проходят, система стабильно решает задачу за 3–4 шага.
5. Критерии приемки (Definition of Done)
- Supervisor успешно декомпозирует задачу на две подзадачи и назначает их Worker’ам.
- Каждый Worker выполняет свою подзадачу и возвращает результат.
- Финальный ответ формируется путём объединения результатов Worker’ов.
- Общее количество шагов (итераций графа) меньше 5 для успешного выполнения.
- При превышении лимита шагов система корректно завершает работу с сообщением об ошибке.
- Логирование фиксирует каждый шаг с временной меткой и содержанием.
- Код покрыт минимум одним unit‑тестом на ограничение по шагам.
- Система может быть запущена локально без внешних зависимостей (кроме LLM API, если используется не mock).
6. Ожидаемый результат
- Основной артефакт Python‑файл
delegation_system.pyс реализацией графа LangGraph. - Пример лога выполнения
10:00:01 [SUPERVISOR] План: worker_a = "Собрать данные", worker_b = "Анализ" 10:00:02 [WORKER_A] Данные собраны (5 строк) 10:00:03 [WORKER_B] Анализ завершён, средний чек = 1234 руб. 10:00:04 [SUPERVISOR] Итог: ... - Дополнительно файл
test_delegation.pyс тестами; документация по расширению (добавление третьего Worker’а).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Worker не понимает подзадачу из‑за плохой формулировки | Включить в prompt для Worker’а контекст исходной задачи и ожидаемый формат ответа |
| Превышение лимита шагов из‑за рекурсии | Добавить строгую проверку step_count в начале каждого узла; при достижении max_steps - 1 переходить сразу к supervisor_aggregate |
| LLM генерирует невалидный JSON для плана | Использовать pydantic для валидации и повторный запрос с исправлением |
| Параллельное выполнение Worker’ов не поддерживается выбранным фреймворком | Перейти на последовательное выполнение (это не влияет на число шагов, но увеличивает время) |
| Необходимость реального LLM API для демонстрации | Использовать mock‑LLM (например, langchain FakeLLM) или локальную Ollama с маленькой моделью |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| 1. Проектирование архитектуры | 30 мин |
| 2. Реализация Supervisor | 1 ч |
| 3. Реализация Worker’ов | 1 ч |
| 4. Интеграция и финальный узел | 30 мин |
| 5. Тестирование и оптимизация | 45 мин |
| Итого | ~3 ч 45 мин |
Примечание: Для первого выполнения может потребоваться до 5 часов из‑за изучения фреймворка и отладки вызовов LLM.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 112 | Как декомпозировать задачу в агентной системе |
| 115 | Реализация Supervisor агента |
| 118 | Динамическое делегирование подзадач |
| 120 | Оценка количества шагов выполнения |
| 210 | Метрики эффективности multi‑agent |
| 212 | Балансировка нагрузки между worker’ами |
| 215 | Обработка ошибок при делегировании |
| 220 | Мониторинг и логирование шагов |
| 250 | Сравнение фреймворков агентов |
| 300 | Иерархическое планирование |
10. Чек-лист самопроверки
- Я определил состояние графа (AgentState) со всеми необходимыми полями.
- Supervisor на этапе планирования проверяет step_count < max_steps.
- Worker’ы принимают контекст из состояния и возвращают результат в определённые поля.
- У меня есть тест, который проверяет, что задача решается за <5 шагов.
- Логи записываются в поле logs и дублируются в stdout.
- При ошибке в одном из Worker’ов Supervisor переходит к
supervisor_aggregateс сообщением об ошибке (не падает).