中文翻译暂不可用,显示俄语原文。

Реализовать actor model для агентов

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать actor model для агентов

1. Цель задачи

Изучить и реализовать архитектурный паттерн Actor Model для построения асинхронного взаимодействия между AI-агентами. Каждый агент представляет собой независимый actor с собственным mailbox, обрабатывающий сообщения последовательно и асинхронно. В результате вы получите базовый фреймворк для мультиагентных систем, где агенты общаются исключительно через обмен сообщениями, что исключает разделяемое состояние и блокировки.

Ключевой результат Работающий прототип actor-системы на Python с asyncio, в котором два агента асинхронно обмениваются сообщениями, а время обработки демонстрирует неблокирующее выполнение.

2. Исходные данные

Что нужноОткуда взять
Python 3.9+ (с поддержкой asyncio)Установлен в системе или conda-окружение
Редактор кодаVS Code / PyCharm / Jupyter
Базовое понимание asyncioДокументация Python, предыдущие упражнения
Примеры actor model в Python (опционально)Библиотеки: pykka, thespian или статьи по Erlang/Elixir

Если нет реального инструмента — симулируем:

  1. Мы не будем подключать внешние библиотеки для actor model, а напишем собственную минимальную реализацию на asyncio и queue.Queue (или asyncio.Queue).
  2. Создадим абстрактный класс Actor, который каждый агент наследует.
  3. Вместо полноценного супервизора используем простой event loop на asyncio.gather.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.9+Реализация actor model
Асинхронный рантаймasyncioОбеспечение конкурентности
Очереди сообщенийasyncio.QueueРеализация mailbox для каждого actor
Логированиеlogging / structlogОтслеживание потока сообщений
Тестированиеpytest + pytest-asyncioВалидация асинхронной обработки
Визуализация (опционально)Mermaid / PlantUMLДиаграмма взаимодействия агентов

4. Этапы выполнения

Этап 1: Проектирование интерфейсов (1 час)

Действия

  1. Определить базовую структуру сообщения (Message):

    from typing import Any, Optional
    from dataclasses import dataclass
    from uuid import uuid4
    
    @dataclass
    class Message:
        sender_id: str
        receiver_id: str
        payload: Any
        msg_id: str = uuid4().hex[:8]
        reply_to: Optional[str] = None
    
  2. Спроектировать класс Actor:

    • Атрибуты: actor_id: str, mailbox: asyncio.Queue
    • Методы: send(receiver, message), receive(), process_message(msg), run()
    • run() – бесконечный цикл, читающий из mailbox и вызывающий process_message
  3. Создать интерфейс ActorSystem:

    • Реестр акторов: dict[str, [[Вики/agent|Actor
    • Метод register(actor)
    • Метод send_message(sender_id, receiver_id, payload)
    • Метод shutdown()

Ожидаемый результат этапа Файл interfaces.py с датаклассом Message, абстрактным классом Actor и классом ActorSystem.

Этап 2: Реализация базового Actor и Mailbox (2 часа)

Действия

  1. Написать реализацию ActorSystem с asyncio:

    import asyncio
    from typing import Dict
    
    class ActorSystem:
        def __init__(self):
            self._actors: Dict[str, 'Actor'] = {}
            self._tasks: Dict[str, asyncio.Task] = {}
    
        def register(self, actor: 'Actor'):
            actor._system = self
            self._actors[actor.actor_id] = actor
            self._tasks[actor.actor_id] = asyncio.create_task(actor.run())
    
        async def send_message(self, sender_id: str, receiver_id: str, payload) -> None:
            receiver = self._actors.get(receiver_id)
            if receiver is None:
                raise ValueError(f"Actor {receiver_id} not found")
            msg = Message(sender_id=sender_id, receiver_id=receiver_id, payload=payload)
            await receiver.mailbox.put(msg)
    
        async def shutdown(self):
            for task in self._tasks.values():
                task.cancel()
            await asyncio.gather(*self._tasks.values(), return_exceptions=True)
    
  2. Реализовать конкретный класс AsyncActor:

    class AsyncActor:
        def __init__(self, actor_id: str):
            self.actor_id = actor_id
            self.mailbox: asyncio.Queue = asyncio.Queue()
            self._system: Optional[ActorSystem] = None
            self._running = True
    
        async def send(self, receiver_id: str, payload):
            if self._system:
                await self._system.send_message(self.actor_id, receiver_id, payload)
    
        async def receive(self) -> Message:
            return await self.mailbox.get()
    
        async def process_message(self, msg: Message) -> None:
            # Default: just log (override in subclass)
            print(f"[{self.actor_id}] received: {msg.payload}")
    
        async def run(self):
            while self._running:
                try:
                    msg = await self.receive()
                    await self.process_message(msg)
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    print(f"[{self.actor_id}] error: {e}")
    
        def stop(self):
            self._running = False
    
  3. Добавить логирование с logging.getLogger(self.actor_id).

Ожидаемый результат этапа Файл actor.py с полной реализацией ActorSystem и AsyncActor.

Этап 3: Создание двух специализированных агентов (1.5 часа)

Действия

  1. Создать агента WorkerAgent, который инвертирует строку:

    class WorkerAgent(AsyncActor):
        async def process_message(self, msg: Message) -> None:
            transformed = msg.payload[::-1]
            logging.info(f"Worker received '{msg.payload}', sending back reversed")
            await self.send(msg.sender_id, f"Reversed: {transformed}")
    
  2. Создать агента OrchestratorAgent, который отправляет задания и собирает ответы:

    class OrchestratorAgent(AsyncActor):
        async def process_message(self, msg: Message) -> None:
            logging.info(f"Orchestrator got reply: {msg.payload}")
    
  3. Написать сценарий запуска:

    async def main():
        system = ActorSystem()
        orchestrator = OrchestratorAgent("orchestrator")
        worker = WorkerAgent("worker")
        system.register(worker)
        system.register(orchestrator)
    
        # посылаем несколько сообщений подряд
        for text in ["hello", "world", "actor", "model"]:
            await orchestrator.send("worker", text)
    
        await asyncio.sleep(2)  # даём время на обработку
        await system.shutdown()
    
    asyncio.run(main())
    

Ожидаемый результат этапа Файл agents.py с двумя агентами и скрипт run.py, который запускает систему и демонстрирует асинхронную обработку.

Этап 4: Тестирование асинхронности и производительности (1.5 часа)

Действия

  1. Написать unit-тесты с pytest-asyncio:

    • Проверить, что сообщение доставляется агенту
    • Проверить, что агент обрабатывает сообщения последовательно (одно за другим)
    • Проверить, что несколько агентов работают параллельно
  2. Написать тест на асинхронность: отправить 10 сообщений, измерить общее время. Обработка каждого должна занимать ~0.1 сек (симулировать await asyncio.sleep(0.1) в process_message). Если общее время < 0.1*N, значит обработка действительно асинхронная.

  3. Тест на корректность mailbox: убедиться, что порядок сообщений сохраняется для каждого актора.

Код теста (фрагмент):

import pytest
@pytest.mark.asyncio
async def test_parallel_processing():
    system = ActorSystem()
    results = []
    class SlowActor(AsyncActor):
        async def process_message(self, msg):
            await asyncio.sleep(0.1)
            results.append(msg.payload)
    actor = SlowActor("slow")
    system.register(actor)
    start = time.time()
    for i in range(5):
        await asyncio.create_task(system.send_message("test", "slow", i))
    await asyncio.sleep(1)
    elapsed = time.time() - start
    assert elapsed < 0.3  # если бы последовательно, было бы 0.5
    assert len(results) == 5

Ожидаемый результат этапа Файл test_actor_model.py с набором тестов, подтверждающих асинхронное выполнение.

Этап 5: Документирование и демонстрация (1 час)

Действия

  1. Создать README.md с:

    • Архитектурной диаграммой (текстовой или Mermaid)
    • Инструкцией по запуску
    • Примером лога работы системы
  2. Дополнительно (опционально): реализовать механизм supervisor (перезапуск упавшего агента). Для этого добавить on_crash callback в ActorSystem.

Ожидаемый результат этапа Документ README.md и демонстрационный вывод в консоли.

5. Критерии приемки (Definition of Done)

  • Каждый агент имеет собственный asyncio.Queue в качестве mailbox.
  • Сообщения отправляются асинхронно и обрабатываются без блокировки main-потока.
  • Реализован механизм регистрации агентов в системе (ActorSystem).
  • Написаны тесты, доказывающие параллельную обработку сообщений.
  • Логирование показывает, что обработка разных сообщений перемежается (не последовательна).
  • Время выполнения пакета из N сообщений (с sleep(0.1)) меньше N * 0.1 сек (минимум в 2 раза).
  • Код покрыт type hints (аннотации типов).
  • README содержит инструкцию по запуску и пример вывода.

6. Ожидаемый результат

  • Основной файл папка actor_model/ с модулями:
    • interfaces.py
    • actor.py
    • agents.py
    • run.py
    • test_actor_model.py
    • README.md
  • Содержание рабочая реализация actor model на Python asyncio, демонстрация асинхронного обмена сообщениями между двумя агентами, юнит-тесты.
  • Дополнительно (опционально): supervisor для автоматического перезапуска упавшего агента; поддержка таймаутов при обработке.

7. Возможные сложности и их решение

СложностьРешение
Забыть вызвать asyncio.create_task(actor.run()) — актор не запущенВ register() сразу создавать таск; проверять, что таск не завершён
Деадлок из-за ожидания ответа от самого себяЗапретить отправку сообщения себе; в send_message проверять sender_id != receiver_id
Сообщения могут теряться при краше актораДобавить try-except в цикл run; в случае ошибки логировать и продолжать
Сложность с остановкой всех корутинИспользовать asyncio.gather с return_exceptions=True и отменять через task.cancel()
Тесты на асинхронность нестабильны (race condition)Использовать pytest-timeout и увеличить asyncio.sleep в тестах до 0.2

8. Бюджет времени (оценка)

ЭтапВремя
Проектирование интерфейсов1 час
Реализация базового Actor и Mailbox2 часа
Создание агентов1,5 часа
Тестирование1,5 часа
Документирование1 час
Итого7 часов

Примечание: При первом знакомстве с asyncio время может увеличиться до 10–12 часов. Рекомендуется пройти краткий туториал по asyncio перед началом.

9. Связанные вопросы из базы знаний

ВопросТема
45Что такое Actor Model и его преимущества
112Асинхронное программирование в Python: asyncio vs threading
203Очереди (Queue) для межпотокового взаимодействия
378Пул потоков и конкурентность в AI-агентах
511Шаблон "Mailbox" для обработки сообщений
644Обработка ошибок в асинхронных задачах
721Supervisor для перезапуска упавших агентов
835Тестирование асинхронного кода с pytest-asyncio

10. Чек-лист самопроверки

  • Я определил структуру Message с уникальным ID и метаданными.
  • Мой AsyncActor имеет mailbox как asyncio.Queue.
  • В ActorSystem я зарегистрировал акторов и создал для каждого asyncio-задачу.
  • Я написал как минимум два агента, которые обмениваются сообщениями.
  • Мои тесты показывают, что время обработки пакета сообщений меньше, чем суммарное время последовательной обработки.