English translation is not available yet. Showing Russian content.

Как вы делаете асинхронную обработку long-running (>30s) LLM задач?

Краткий тезис

Long-running LLM задачи (генерация длинных ответов, мультиагентные пайплайны, сложный RAG) требуют асинхронной обработки, чтобы не блокировать пользователя. Основной паттерн — возврат HTTP 202 Accepted с уникальным идентификатором задачи (task_id), после чего клиент либо опрашивает статус через polling (GET /tasks/{id}), либо получает уведомление через webhook. На бэкенде задачи ставятся в очередь (Celery, Kafka) и выполняются воркерами, а результаты сохраняются в хранилище (Redis/PostgreSQL) для последующего получения.


1. Термин: Long-running LLM задача

Long-running LLM задача — это любой call|вызов LLM, время выполнения которого превышает ожидаемый лимит синхронного ответа (обычно 10–30 секунд). Примеры:

  • Генерация текста с параметрами max_tokens=8000 на медленной модели.
  • Многошаговый RAG|Agentic RAG с цепочкой вызовов инструментов.
  • Batch-обработка нескольких промптов одним запросом.
  • Вызов внешних API внутри пайплайна (например, поиск в интернете, выполнение кода).

Если такая задача выполняется синхронно, клиент (браузер, мобильное приложение) рискует получить таймаут по HTTP и пользователь видит пустой экран.


2. Проблемы синхронного подхода

ПроблемаОписание
Блокировка соединенияHTTP-соединение держится открытым десятки секунд, расходуя ресурсы сервера.
Таймаут на стороне клиентаБраузеры и библиотеки (requests, axios) часто имеют лимит 30–60 с.
Плохой UXПользователь ждёт без обратной связи.
Сложность масштабированияКаждый запрос занимает поток/воркер на всё время выполнения.

Асинхронный подход решает эти проблемы, разделяя приём запроса и получение результата.


3. Паттерн: 202 Accepted + Task ID

Базовая идея: при получении запроса сервер не ждёт завершения задачи, а сразу возвращает HTTP 202 Accepted и JSON с идентификатором задачи (task_id).

// POST /async/completion
// Request: { "prompt": "...", "max_tokens": 5000 }

// Response (202)
{
  "task_id": "abc123-...",
  "status": "queued",
  "estimated_time_sec": 45
}

Преимущества

  • Клиент получает ответ мгновенно.
  • Сервер может поставить задачу в очередь и не держать поток.
  • Клиент может показывать пользователю прогресс (например, «Обрабатывается…»).

4. Polling (опрос) — клиент сам спрашивает статус

После получения task_id клиент периодически вызывает GET /tasks/{id для проверки статуса.

import requests, time

TASK_ENDPOINT = "https://api.example.com/tasks/"

def wait_for_task(task_id, poll_interval=2, timeout=120):
    start = time.time()
    while time.time() - start < timeout:
        resp = requests.get(f"{TASK_ENDPOINT}{task_id}")
        data = resp.json()
        if data["status"] == "completed":
            return data["result"]
        elif data["status"] == "failed":
            raise Exception(data.get("error", "Task failed"))
        time.sleep(poll_interval)
    raise TimeoutError("Task did not complete within timeout")

Возможные статусы

  • queued — в очереди
  • processing — выполняется
  • completed — готово
  • failed — ошибка

Недостатки polling

  • Лишние запросы, нагрузка на API.
  • Задержка между завершением и моментом узнавания (до poll_interval).
  • Клиент должен сам реализовать логику опроса.

Рекомендации

  • Использовать экспоненциальную задержку (сначала 1 с, потом 2, 4, 8… до 30 с).
  • Передавать в ответе estimated_remaining_sec, чтобы клиент адаптировал интервал.

5. Webhook — сервер сам уведомляет клиента

Вместо активного опроса клиент предоставляет callback URL (вебхук), на который сервер отправит результат после завершения.

// POST /async/completion
{
  "prompt": "...",
  "webhook_url": "https://myapp.com/callback"
}

// Ответ: { "task_id": "abc", "status": "queued" }

// Через 45 с сервер отправляет POST на webhook_url:
{
  "task_id": "abc",
  "status": "completed",
  "result": { "text": "..." }
}

Преимущества webhook

  • Мгновенное уведомление без опросов.
  • Меньше нагрузки на сервер.
  • Подходит для серверных приложений (например, боты, backend-to-backend).

Недостатки

  • Клиент должен быть доступен для приёма POST.
  • Нужно обеспечить retry при временных ошибках.
  • Требуется верификация webhook (подпись HMAC) для безопасности.
КритерийPollingWebhook
Сложность клиентаНизкаяСредняя (нужен endpoint)
Задержка уведомленияДо poll_intervalМгновенно
Нагрузка на серверВыше (частые запросы)Ниже (только один коллбэк)
Подходит для браузеровДа (fetch/setInterval)Только Service Worker / SSE
МасштабированиеНеограниченноЗависит от надёжности клиента

6. Фоновая обработка: очередь + Worker

На бэкенде Long-running задачи управляются через очередь сообщений (message queue). Популярные решения: Celery (брокер Redis/RabbitMQ) или Apache Kafka (для высоких нагрузок).

Архитектура

Client → API Gateway (202) → Task Queue (Celery/Kafka) → Worker pool (GPU pods) → Result Store (Redis/DB)

Шаги:

  1. API Gateway получает POST-запрос, создаёт запись в БД (task_id, status='queued', payload), отправляет сообщение в очередь.
  2. Один из воркеров подхватывает задачу, меняет статус на processing, вызывает LLM.
  3. После завершения воркер сохраняет результат в Result Store (например, Redis с TTL) и обновляет статус на completed.
  4. Клиент (через polling или webhook) получает результат.

Код Celery (Python):

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def run_llm_generation(self, prompt: str):
    try:
        # Имитация долгого вызова LLM
        result = call_llm(prompt, max_tokens=5000)
        return {'status': 'completed', 'result': result}
    except Exception as e:
        self.retry(exc=e, countdown=30)

# В API:
task = run_llm_generation.delay(prompt)
return {"task_id": task.id}, 202

Kafka используется, когда требуется:

  • Гарантированная доставка и повторная обработка (exactly-once семантика).
  • Хранение истории сообщений.
  • Распределённая обработка между микросервисами.

7. Управление очередью: приоритеты, retry, dead letter

Чтобы система была надёжной, нужно:

  • Приоритеты — короткие задачи (например, чат-бот) должны выполняться раньше длинных. Celery поддерживает очереди с разной скоростью потребления.
  • Retry с exponential backoff — при временных ошибках (overloaded LLM, сетевые сбои) задача повторяется через увеличивающийся интервал.
  • Dead Letter Queue (DLQ) — задачи, исчерпавшие max_retries, попадают в DLQ для ручного разбора.
  • TTL (Time-To-Live) — если результат не будет востребован (например, прошло 10 минут), он автоматически удаляется.
@app.task(bind=True, max_retries=5, default_retry_delay=60, acks_late=True)
def long_llm_task(self, ...):
    # ...
    if something_fatal:
        self.request.chain = None  # предотвратить повторение
        raise

8. Хранение состояния задач

Для polling и webhook необходимо единое хранилище статусов и результатов. Варианты:

ХранилищеПлюсыМинусы
RedisВысокая скорость, TTL, атомарные операцииНет долговременного хранения, возможна потеря при сбое без AOF
PostgreSQLНадёжность, ACID, долговременное хранениеМедленнее, нужна миграция схемы
Mongo/NoSQLГибкая схема, горизонтальное масштабированиеНет транзакций, сложнее согласованность

Типичная схема в Redis

  • task:{id}:status → queued/processing/completed/failed
  • task:{id}:result → JSON (или ссылка на файл S3)
  • task:{id}:meta → время старта, количество ретраев, параметры

TTL устанавливается, например, на 1 час для автоматической очистки.


9. Мониторинг и observability

Для Long-running задач критично отслеживать:

  • Задержка в очереди (queue latency) — сколько задача ждёт воркера.
  • Время выполнения (task duration) — распределение по моделям, размерам промпта.
  • Процент ошибок (failure rate) — по типам ошибок: таймаут LLM, переполнение контекста, ошибки инструментов.
  • Количество активных воркеров — автоподъем при перегрузке.

Инструменты Prometheus + Grafana (метрики), Loki (логи), OpenTelemetry (трассировка).

Пример метрики для Celery (через prometheus-client):

from prometheus_client import Histogram, Counter

task_duration = Histogram('llm_task_duration_seconds', 'Time to complete LLM task', buckets=[1,5,10,30,60,120])
task_errors = Counter('llm_task_errors_total', 'Number of failed tasks', ['error_type'])

10. Обработка ошибок и таймаутов

Таймаут на стороне воркера — если LLM зависла (например, бесконечный цикл в эмбеддинге), необходимо прерывание:

import signal

class TimeoutException(Exception):
    pass

def handler(signum, frame):
    raise TimeoutException("LLM call timed out")

signal.signal(signal.SIGALRM, handler)
signal.alarm(60)  # 60 секунд

try:
    result = call_llm(prompt)
except TimeoutException:
    # завершить задачу с ошибкой
    self.update_state(state='FAILURE', meta={'exc': 'timeout'})

Грейсфул шатдаун воркера — при завершении процесса текущие задачи должны либо дообработаться, либо отложиться обратно (ack_on_failure=False).

Dead Letter Queue для задач, которые не удалось выполнить после всех ретраев.


11. Выбор архитектуры для Agentic RAG

В Agentic RAG LLM вызывается многократно (план → поиск → чтение → ответ). Каждый шаг может быть Long-running (>30s). Асинхронная обработка позволяет:

  • Параллельно запускать несколько шагов (например, одновременный поиск по нескольким источникам).
  • Сохранять промежуточное состояние (state) для возобновления после сбоя.
  • Использовать DAG задач (например, через Prefect, Temporal) для координации.

Пример Agentic RAG с очередью

  1. Пользователь отправляет запрос → менеджер ставит задачу в очередь.
  2. Воркер запускает агента, который порождает подзадачи (поиск, суммаризация).
  3. Каждая подзадача — отдельное сообщение в очереди с приоритетом.
  4. По завершении всех подзадач агент собирает ответ и сохраняет его.
  5. Клиент получает результат через polling или webhook.

Пет-проект для закрепления

Задача Реализовать асинхронный сервис для генерации текста через OpenAI API с поддержкой polling и webhook.

Инструменты FastAPI, Celery, Redis, Python, Docker Compose.

Шаги:

  1. Создать FastAPI endpoint POST /generate с параметрами prompt, max_tokens, webhook_url (опционально).
  2. При получении запроса — создать задачу в Celery (generate_text.delay(prompt, max_tokens)), вернуть 202 { task_id, status }.
  3. Написать задачу Celery, которая вызывает OpenAI API, обрабатывает таймауты (signal alarm), сохраняет результат в Redis (ключ task:{id}).
  4. Реализовать GET /tasks/{id} для polling.
  5. Если передан webhook_url, после завершения отправить POST на этот URL с результатом (подписать HMAC).
  6. Настроить мониторинг: добавить prometheus_client для длительности и ошибок.
  7. Написать клиентский скрипт, который делает запрос и опрашивает или ожидает webhook.

Ожидаемый результат

  • Клиент получает task_id мгновенно.
  • Polling работает с экспоненциальной задержкой.
  • Webhook доставляет результат в течение секунды после завершения.
  • Метрики видны в Grafana (например, гистограмма времени генерации).

Связь с другими вопросами

ВопросТема
252Архитектура Agentic RAG (многошаговые пайплайны)
254Управление состоянием в агентах (state management)
255Мониторинг и observability для LLM
260Вебхуки и события (event-driven architecture)
270Производительность LLM-инференса (latency, throughput)

Навигация