Что такое Agent Loop и какие компоненты входят в production-ready loop?

Краткий тезис

Agent Loop (feedback|агентный цикл) — это центральный управляющий процесс в Agentic RAG, который повторяет: восприятие состояния, call|вызов LLM, выполнение инструментов, проверку ответов и обновление памяти. В production-ready версии цикл дополняется safety valves (защитными клапанами), middleware chains (цепочками промежуточных обработчиков), fallback adapters (адаптерами отказоустойчивости), SSE (Server-Sent Events) для потоковой передачи и output parsers (парсерами вывода). Такой цикл обеспечивает надёжность, масштабируемость и безопасность агентного поведения.


1. Термин: Agent Loop – ядро слоя Harness

В архитектуре RAG|Agentic RAG Harness — это слой оркестрации, который управляет взаимодействием между пользователем, LLM, инструментами и хранилищем данных. Agent Loop (цикл агента) — его основной исполнитель. Цикл последовательно проходит этапы:

  1. Получение текущего состояния (сообщения истории, память, сессия).
  2. call|Вызов LLM с системным промптом и списком доступных инструментов.
  3. Анализ ответа LLM:
    • Если LLM решила вызвать tool (инструмент), то выполнить его с валидацией и rate limiting.
    • Если LLM сгенерировала финальный ответ, то проверить его через guardrails.
  4. Обновление памяти, сохранение состояния сессии, логирование trace.
  5. Повтор шагов 2–4 до тех пор, пока не будет получен финальный ответ или не сработает условие остановки.

Production-ready означает, что каждый шаг защищён от сбоев, перегрузок и небезопасного поведения.


2. Компоненты production-ready Agent Loop

Ниже — таблица основных компонентов с назначением:

КомпонентНазначениеПримеры
State ManagerХранит историю сообщений, переменные сессии, контекстПамять в RAM, Redis, LangChain BaseStore
LLM InvokerВызов модели с системным промптом и списком инструментовOpenAI, Anthropic, локальная модель через Ollama
Tool ExecutorЗапуск инструментов (API-запросы, выполнение кода) с валидацией входов/выходовPython-функции, REST API, SQL-запросы
GuardrailsПроверка финального ответа на безопасность, релевантность, фактыGuardrails AI, Nemo Guardrails, Azure Content Safety
Memory UpdaterОбновляет краткосрочную (диалог) и долгосрочную (векторная БД) памятьBufferWindowMemory, SummarizerMemory, Postgres
Output ParserПреобразует текстовый ответ LLM в структурированный формат (JSON, Pydantic)LangChain PydanticOutputParser, Instructor
Middleware ChainЦепочка промежуточных обработчиков (логирование, модификация промпта, кэширование)Chain-of-thought, фильтры токсичности, добавление метаданных
Rate LimiterОграничивает частоту вызовов LLM и инструментовна стороне API или на уровне приложения (token bucket)
Safety ValveМеханизм аварийной остановки при превышении лимита шагов, времени или стоимостиMaxSteps, Timeout, BudgetWatcher
Fallback AdapterОбработка ошибок вызова инструмента или LLM: повтор, альтернативный сервисretry with backoff, замена на кэш, переход на резервную модель
Streaming (SSE)Потоковая передача результатов выполнения (промежуточных мыслей, финального ответа) пользователюServer-Sent Events, WebSocket

Каждый компонент должен быть асинхронным и неблокирующим, чтобы поддерживать высокий параллелизм.


3. Получение состояния: память, сессия, контекст

State Manager отвечает за сбор актуального контекста перед каждым вызовом LLM. В состояние входят:

  • Сообщения истории — все предыдущие сообщения пользователя и агента.
  • Session variables — идентификатор сессии, аутентификационные данные, настройки.
  • Memory — долгосрочная (суммированные факты, извлечённые из базы знаний) и краткосрочная (последние n сообщений).

Термин «Memory» в Agentic RAG обычно подразумевает не просто буфер сообщений, а структурированное хранилище фактов, обновляемое по мере работы цикла. Пример:

class StateManager:
    def get_state(self, session_id: str):
        history = self.load_history(session_id)
        memory = self.load_long_term_memory(session_id)
        return {
            "messages": history,
            "memory": memory,
            "session_vars": self.get_session_vars(session_id)
        }

4. Вызов LLM с инструментами

LLM Invoker конструирует полный промпт: системное сообщение с описанием доступных инструментов (часто в виде function calling или tool definition), историю диалога и текущий запрос. Пример для OpenAI:

import openai

tools = [
    {
        "type": "function",
        "function": {
            "name": "search_documents",
            "description": "Поиск релевантных чанков в векторной БД",
            "parameters": {"query": {"type": "string"}}
        }
    }
]

response = openai.ChatCompletion.create(
    model="gpt-4o",
    messages=state["messages"],
    tools=tools
)

LLM может вернуть либо финальный ответ (content), либо вызов инструмента (tool_calls). Цикл обрабатывает оба варианта.


5. Выполнение инструмента с валидацией и rate limiting

Tool Executor получает вызов инструмента (имя + аргументы) и выполняет его с проверками:

  • Валидация аргументов — по Pydantic схеме (например, если инструмент ожидает целое число, а передана строка — ошибка).
  • Rate Limiting — ограничение количества вызовов на единицу времени (например, не более 5 вызовов поиска в секунду). Реализуется через token bucket или semaphore.
import asyncio
from pydantic import BaseModel

class SearchArgs(BaseModel):
    query: str

class ToolExecutor:
    def __init__(self):
        self.semaphore = asyncio.Semaphore(10)  # максимум 10 одновременных

    async def execute(self, tool_name: str, args: dict):
        if tool_name == "search_documents":
            async with self.semaphore:
                validated = SearchArgs(**args)
                result = await self._search(validated.query)
                return result

6. Генерация ответа и guardrails

Когда LLM возвращает текстовый ответ, он проходит через Guardrails. Типы проверок:

  • Безопасность — отсутствие токсичных, дискриминационных или небезопасных выражений.
  • Фактическая точность — сверка с извлечёнными документами (faithfulness).
  • Соблюдение формата — ответ соответствует ожидаемой структуре (JSON, Markdown).

Пример интеграции Guardrails AI:

import guardrails as gd

guard = gd.Guard.from_pydantic(output_class=MyOutputSchema)
validated = guard.parse(response, llm_output=response_text)

Если проверка не пройдена, цикл может либо запросить у LLM переформулировку, либо вернуть сообщение об ошибке.


7. Обновление памяти и сессии

После выполнения инструментов или генерации ответа Memory Updater сохраняет новую информацию:

  • В краткосрочную память — добавляет текущий turn (сообщение агента и пользователя).
  • В долгосрочную память — может записать факты, извлечённые из ответа (например, "Пользователь хочет купить билет на 15 июня").

Обновление сессии включает сохранение стейта, чтобы при следующем запросе агент мог продолжить диалог.


8. Логирование и трассировка (trace)

Каждый шаг цикла должен быть залогирован с идентификатором traceId. Trace содержит:

  • Входные данные (промпт, инструменты).
  • Выходные данные LLM.
  • Результаты инструментов.
  • Время выполнения.
  • Решение guardrails.

Современные фреймворки (LangSmith, Weights & Biases, OpenTelemetry) позволяют визуализировать цепочки вызовов. Пример структуры трассы:

{
  "traceId": "abc123",
  "steps": [
    {"step": "llm_call", "input": "...", "output": "...", "duration_ms": 1200},
    {"step": "tool_call", "tool": "search_documents", "duration_ms": 300},
    {"step": "llm_final", "output": "Ответ пользователю", "duration_ms": 900}
  ]
}

9. Повтор цикла: условия завершения

Цикл выполняется, пока:

  • LLM не вернёт финальный ответ (без вызова инструментов).
  • Не будет превышен лимит шагов (max_steps).
  • Не истечёт таймаут (max_duration).
  • Не будет достигнута максимальная стоимость (budget).

Safety Valve контролирует эти лимиты и прерывает цикл с сообщением об ошибке, если лимит исчерпан.

class SafetyValve:
    def __init__(self, max_steps=10, max_duration=30.0):
        self.steps = 0
        self.start_time = time.time()
        self.max_steps = max_steps
        self.max_duration = max_duration

    def should_continue(self) -> bool:
        if self.steps >= self.max_steps:
            return False
        if time.time() - self.start_time >= self.max_duration:
            return False
        return True

10. Production enhancements: middleware, fallback, streaming

Middleware chains

Позволяют вставлять обработчики до/после LLM или инструментов. Например:

  • Логирование каждого вызова.
  • Кэширование одинаковых запросов к инструментам.
  • Обогащение контекста — добавление информации о пользователе перед вызовом LLM.

Fallback adapters

Если инструмент недоступен (сетевая ошибка, таймаут), адаптер может:

  • Повторить запрос с экспоненциальной задержкой.
  • Использовать альтернативный инструмент (например, другой поисковый API).
  • Вернуть заглушку и продолжить цикл.

SSE для streaming

Пользователь видит промежуточные шаги агента (например, "Ищу информацию...", "Анализирую..."). Реализуется через Server-Sent Events — агент отправляет события по мере выполнения шагов.


11. Сравнение с простым RAG

ХарактеристикаПростой RAGAgentic RAG с Agent Loop
Число LLM-вызовов1 (генерация)1..N (многократные вызовы)
ИнструментыТолько поискЛюбые: поиск, календарь, код, API
Управление ошибкамиНетGuardrails, Fallback
ПотоковостьТолько финальный ответПромежуточные мысли по SSE
ПамятьТолько контекстКраткосрочная + долгосрочная
БезопасностьБазоваяGuardrails, Safety Valves

12. Пример реализации (концептуальный код на asyncio)

async def agent_loop(session_id: str, user_message: str):
    state = await state_manager.get_state(session_id)
    safety = SafetyValve(max_steps=5, max_duration=20.0)

    while safety.should_continue():
        response = await llm_invoker.call(state, tools)
        safety.steps += 1

        if response.tool_calls:
            for tool_call in response.tool_calls:
                result = await tool_executor.execute(tool_call)
                state.messages.append({"role": "tool", "content": str(result)})
                # передаём результат обратно LLM
        else:
            validated = await guardrails.check(response.content)
            if validated.passed:
                await memory_updater.update(session_id, state)
                await trace_logger.log(safety, state, response)
                return validated.output
            else:
                state.messages.append({"role": "user", "content": "Пожалуйста, исправь ответ"})
                # цикл продолжается с дополнительным запросом

    return "Извините, ответ не получен за отведённое время."

Пет-проект для закрепления

Задача Создать агент-помощника, который ищет информацию в локальной базе знаний, выполняет простые вычисления и возвращает структурированный ответ с проверкой на галлюцинации.

Инструменты

  • Python, asyncio, aiohttp
  • LangChain (для вызова LLM и инструментов)
  • FAISS (векторное хранилище)
  • Guardrails AI (проверка фактов)
  • FastAPI + SSE (для демонстрации)

Шаги:

  1. Реализовать класс AgentLoop с методами run, _call_llm, _execute_tool, _guard_check.
  2. Определить два инструмента: search_knowledge(query) и calculate(expression).
  3. Настроить state_manager на сохранение истории в JSON-файле.
  4. Добавить safety_valve с лимитом 3 шага.
  5. Интегрировать вывод шагов через SSE-endpoint.
  6. Протестировать диалог: пользователь пишет «Найди статью про RAG и посчитай, сколько в ней слов».

Ожидаемый результат Агент за 2-3 шага находит чанк, отправляет его в calculate (подсчёт слов) и возвращает итог с проверкой, что ответ соответствует чанку.


Связь с другими вопросами

ВопросТема
743Архитектура слоёв Agentic RAG (место Harness)
745Что такое Harness и его компоненты
746Инструменты (tools) в Agentic RAG
747Guardrails и безопасность
748Управление памятью в агентах
749SSE и стриминг в агентах
5Оценка качества RAG (можно применить к циклу)
10Self-RAG (вызов рефлексии внутри цикла)
50Fine-tuning агентного поведения

Навигация