Реализовать actor model для агентов
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать actor model для агентов
1. Цель задачи
Изучить и реализовать архитектурный паттерн Actor Model для построения асинхронного взаимодействия между AI-агентами. Каждый агент представляет собой независимый actor с собственным mailbox, обрабатывающий сообщения последовательно и асинхронно. В результате вы получите базовый фреймворк для мультиагентных систем, где агенты общаются исключительно через обмен сообщениями, что исключает разделяемое состояние и блокировки.
Ключевой результат Работающий прототип actor-системы на Python с asyncio, в котором два агента асинхронно обмениваются сообщениями, а время обработки демонстрирует неблокирующее выполнение.
2. Исходные данные
| Что нужно | Откуда взять |
|---|---|
| Python 3.9+ (с поддержкой asyncio) | Установлен в системе или conda-окружение |
| Редактор кода | VS Code / PyCharm / Jupyter |
| Базовое понимание asyncio | Документация Python, предыдущие упражнения |
| Примеры actor model в Python (опционально) | Библиотеки: pykka, thespian или статьи по Erlang/Elixir |
Если нет реального инструмента — симулируем:
- Мы не будем подключать внешние библиотеки для actor model, а напишем собственную минимальную реализацию на asyncio и queue.Queue (или asyncio.Queue).
- Создадим абстрактный класс Actor, который каждый агент наследует.
- Вместо полноценного супервизора используем простой event loop на asyncio.gather.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.9+ | Реализация actor model |
| Асинхронный рантайм | asyncio | Обеспечение конкурентности |
| Очереди сообщений | asyncio.Queue | Реализация mailbox для каждого actor |
| Логирование | logging / structlog | Отслеживание потока сообщений |
| Тестирование | pytest + pytest-asyncio | Валидация асинхронной обработки |
| Визуализация (опционально) | Mermaid / PlantUML | Диаграмма взаимодействия агентов |
4. Этапы выполнения
Этап 1: Проектирование интерфейсов (1 час)
Действия
-
Определить базовую структуру сообщения (Message):
from typing import Any, Optional from dataclasses import dataclass from uuid import uuid4 @dataclass class Message: sender_id: str receiver_id: str payload: Any msg_id: str = uuid4().hex[:8] reply_to: Optional[str] = None -
Спроектировать класс Actor:
- Атрибуты:
actor_id: str, mailbox: asyncio.Queue - Методы: send(receiver, message),
receive(),process_message(msg),run() run()– бесконечный цикл, читающий из mailbox и вызывающийprocess_message
- Атрибуты:
-
Создать интерфейс ActorSystem:
- Реестр акторов: dict[str, [[Вики/agent|Actor
- Метод register(actor)
- Метод send_message(sender_id, receiver_id, payload)
- Метод
shutdown()
Ожидаемый результат этапа Файл interfaces.py с датаклассом Message, абстрактным классом Actor и классом ActorSystem.
Этап 2: Реализация базового Actor и Mailbox (2 часа)
Действия
-
Написать реализацию ActorSystem с asyncio:
import asyncio from typing import Dict class ActorSystem: def __init__(self): self._actors: Dict[str, 'Actor'] = {} self._tasks: Dict[str, asyncio.Task] = {} def register(self, actor: 'Actor'): actor._system = self self._actors[actor.actor_id] = actor self._tasks[actor.actor_id] = asyncio.create_task(actor.run()) async def send_message(self, sender_id: str, receiver_id: str, payload) -> None: receiver = self._actors.get(receiver_id) if receiver is None: raise ValueError(f"Actor {receiver_id} not found") msg = Message(sender_id=sender_id, receiver_id=receiver_id, payload=payload) await receiver.mailbox.put(msg) async def shutdown(self): for task in self._tasks.values(): task.cancel() await asyncio.gather(*self._tasks.values(), return_exceptions=True) -
Реализовать конкретный класс
AsyncActor:class AsyncActor: def __init__(self, actor_id: str): self.actor_id = actor_id self.mailbox: asyncio.Queue = asyncio.Queue() self._system: Optional[ActorSystem] = None self._running = True async def send(self, receiver_id: str, payload): if self._system: await self._system.send_message(self.actor_id, receiver_id, payload) async def receive(self) -> Message: return await self.mailbox.get() async def process_message(self, msg: Message) -> None: # Default: just log (override in subclass) print(f"[{self.actor_id}] received: {msg.payload}") async def run(self): while self._running: try: msg = await self.receive() await self.process_message(msg) except asyncio.CancelledError: break except Exception as e: print(f"[{self.actor_id}] error: {e}") def stop(self): self._running = False -
Добавить логирование с logging.getLogger(self.actor_id).
Ожидаемый результат этапа Файл actor.py с полной реализацией ActorSystem и AsyncActor.
Этап 3: Создание двух специализированных агентов (1.5 часа)
Действия
-
Создать агента
WorkerAgent, который инвертирует строку:class WorkerAgent(AsyncActor): async def process_message(self, msg: Message) -> None: transformed = msg.payload[::-1] logging.info(f"Worker received '{msg.payload}', sending back reversed") await self.send(msg.sender_id, f"Reversed: {transformed}") -
Создать агента
OrchestratorAgent, который отправляет задания и собирает ответы:class OrchestratorAgent(AsyncActor): async def process_message(self, msg: Message) -> None: logging.info(f"Orchestrator got reply: {msg.payload}") -
Написать сценарий запуска:
async def main(): system = ActorSystem() orchestrator = OrchestratorAgent("orchestrator") worker = WorkerAgent("worker") system.register(worker) system.register(orchestrator) # посылаем несколько сообщений подряд for text in ["hello", "world", "actor", "model"]: await orchestrator.send("worker", text) await asyncio.sleep(2) # даём время на обработку await system.shutdown() asyncio.run(main())
Ожидаемый результат этапа Файл agents.py с двумя агентами и скрипт run.py, который запускает систему и демонстрирует асинхронную обработку.
Этап 4: Тестирование асинхронности и производительности (1.5 часа)
Действия
-
Написать unit-тесты с pytest-asyncio:
- Проверить, что сообщение доставляется агенту
- Проверить, что агент обрабатывает сообщения последовательно (одно за другим)
- Проверить, что несколько агентов работают параллельно
-
Написать тест на асинхронность: отправить 10 сообщений, измерить общее время. Обработка каждого должна занимать ~0.1 сек (симулировать await asyncio.sleep(0.1) в
process_message). Если общее время < 0.1*N, значит обработка действительно асинхронная. -
Тест на корректность mailbox: убедиться, что порядок сообщений сохраняется для каждого актора.
Код теста (фрагмент):
import pytest
@pytest.mark.asyncio
async def test_parallel_processing():
system = ActorSystem()
results = []
class SlowActor(AsyncActor):
async def process_message(self, msg):
await asyncio.sleep(0.1)
results.append(msg.payload)
actor = SlowActor("slow")
system.register(actor)
start = time.time()
for i in range(5):
await asyncio.create_task(system.send_message("test", "slow", i))
await asyncio.sleep(1)
elapsed = time.time() - start
assert elapsed < 0.3 # если бы последовательно, было бы 0.5
assert len(results) == 5
Ожидаемый результат этапа Файл test_actor_model.py с набором тестов, подтверждающих асинхронное выполнение.
Этап 5: Документирование и демонстрация (1 час)
Действия
-
Создать README.md с:
- Архитектурной диаграммой (текстовой или Mermaid)
- Инструкцией по запуску
- Примером лога работы системы
-
Дополнительно (опционально): реализовать механизм supervisor (перезапуск упавшего агента). Для этого добавить
on_crashcallback вActorSystem.
Ожидаемый результат этапа Документ README.md и демонстрационный вывод в консоли.
5. Критерии приемки (Definition of Done)
- Каждый агент имеет собственный
asyncio.Queueв качестве mailbox. - Сообщения отправляются асинхронно и обрабатываются без блокировки main-потока.
- Реализован механизм регистрации агентов в системе (ActorSystem).
- Написаны тесты, доказывающие параллельную обработку сообщений.
- Логирование показывает, что обработка разных сообщений перемежается (не последовательна).
- Время выполнения пакета из N сообщений (с
sleep(0.1)) меньше N * 0.1 сек (минимум в 2 раза). - Код покрыт type hints (аннотации типов).
- README содержит инструкцию по запуску и пример вывода.
6. Ожидаемый результат
- Основной файл папка
actor_model/с модулями:interfaces.pyactor.pyagents.pyrun.pytest_actor_model.pyREADME.md
- Содержание рабочая реализация actor model на Python asyncio, демонстрация асинхронного обмена сообщениями между двумя агентами, юнит-тесты.
- Дополнительно (опционально): supervisor для автоматического перезапуска упавшего агента; поддержка таймаутов при обработке.
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
Забыть вызвать asyncio.create_task(actor.run()) — актор не запущен | В register() сразу создавать таск; проверять, что таск не завершён |
| Деадлок из-за ожидания ответа от самого себя | Запретить отправку сообщения себе; в send_message проверять sender_id != receiver_id |
| Сообщения могут теряться при краше актора | Добавить try-except в цикл run; в случае ошибки логировать и продолжать |
| Сложность с остановкой всех корутин | Использовать asyncio.gather с return_exceptions=True и отменять через task.cancel() |
| Тесты на асинхронность нестабильны (race condition) | Использовать pytest-timeout и увеличить asyncio.sleep в тестах до 0.2 |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Проектирование интерфейсов | 1 час |
| Реализация базового Actor и Mailbox | 2 часа |
| Создание агентов | 1,5 часа |
| Тестирование | 1,5 часа |
| Документирование | 1 час |
| Итого | 7 часов |
Примечание: При первом знакомстве с asyncio время может увеличиться до 10–12 часов. Рекомендуется пройти краткий туториал по asyncio перед началом.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 45 | Что такое Actor Model и его преимущества |
| 112 | Асинхронное программирование в Python: asyncio vs threading |
| 203 | Очереди (Queue) для межпотокового взаимодействия |
| 378 | Пул потоков и конкурентность в AI-агентах |
| 511 | Шаблон "Mailbox" для обработки сообщений |
| 644 | Обработка ошибок в асинхронных задачах |
| 721 | Supervisor для перезапуска упавших агентов |
| 835 | Тестирование асинхронного кода с pytest-asyncio |
10. Чек-лист самопроверки
- Я определил структуру
Messageс уникальным ID и метаданными. - Мой
AsyncActorимеет mailbox какasyncio.Queue. - В
ActorSystemя зарегистрировал акторов и создал для каждого asyncio-задачу. - Я написал как минимум два агента, которые обмениваются сообщениями.
- Мои тесты показывают, что время обработки пакета сообщений меньше, чем суммарное время последовательной обработки.