中文翻译暂不可用,显示俄语原文。

Как вы делаете асинхронную обработку long-running (>30s) LLM задач?

Краткий тезис

Для LLM-задач, выполняющихся дольше 30 секунд (например, генерация отчёта, batch-обработка документов, сложный multi-step reasoning), синхронное ожидание ответа неприемлемо — это блокирует клиента и нарушает UX. Решение — асинхронная обработка через шаблон Async Task: сервер сразу возвращает HTTP 202 Accepted с task_id, а клиент либо опрашивает endpoint (GET /tasks/{id}) с exponential backoff, либо получает результат через webhook. В фоне работают очередь (Kafka / Celery) и воркеры, результат хранится в Redis с TTL. Такой подход позволяет масштабировать обработку, не роняя API.


1. Проблема long-running LLM задач

Long-running задача — любой вызов LLM, занимающий более ~30 секунд. Типичные сценарии:

  • Генерация длинного документа (10+ страниц) с помощью RAG + summarization.
  • Multi-step reasoning (ReAct, Plan-and-Execute) с несколькими итерациями инструментов.
  • Batch-транскрибация аудио (Whisper + перевод).
  • Обработка большого набора чанков (например, 500+ фрагментов) через LLM.

Если держать HTTP-соединение открытым, клиент:

  • рискует получить тайм-аут по timeout (обычно 30–60 сек);
  • тратит ресурсы на ожидание;
  • не может продолжить работу (разве что при многопоточности).

Решение — асинхронный task-based паттерн: запрос = задача (task), статус и результат получаются отдельно.


2. Базовая архитектура асинхронной обработки

Client  ──POST /tasks (LLM prompt)──→  API
                                          │
                                          ├→ task_id (202 Accepted)
                                          │
                   ┌───── очередь (Kafka/Celery/Redis Streams) ───► Worker 1
                   │                                                  │
                   │                                                  ├→ LLM call (40s)
                   │                                                  │
                   │                                                  └→ Write result→ Redis (TTL 24h)
                   │
                   └── Изменение статуса: pending → processing → completed/failed
  • API Gateway — принимает запрос, валидирует, генерирует UUID, сохраняет начальный статус в Redis, отправляет задачу в очередь, возвращает task_id.
  • Очередь сообщений — гарантирует доставку, порядок (необязательно), позволяет масштабировать воркеры.
  • Worker (воркер) — подписчик очереди, выполняет LLM-вызов, записывает результат в persistent store (Redis с TTL или база данных).
  • Хранилище результатов — обычно Redis с TTL 24 часа (или БД для долгого хранения). Клиент забирает результат по GET /tasks/{id}.

3. Варианты получения результата клиентом

3.1 Polling (опрос) с exponential backoff

Клиент периодически вызывает GET /tasks/{id} и проверяет статус.

  • Статусы: pending, processing, completed, failed.
  • Exponential backoff: начинать с 1 сек, удваивать до максимума (например, 30 сек), чтобы не перегружать сервер.
import time
import requests

def poll_task(base_url, task_id, max_attempts=20):
    for i in range(max_attempts):
        resp = requests.get(f"{base_url}/tasks/{task_id}")
        data = resp.json()
        if data["status"] == "completed":
            return data["result"]
        if data["status"] == "failed":
            raise Exception(f"Task failed: {data['error']}")
        wait = min(2 ** i, 30)   # exponential backoff, cap 30s
        time.sleep(wait)
    raise TimeoutError("Task did not finish")
ПараметрРекомендация
Начальная задержка0.5–1 с
Фактор роста2 (экспонента)
Максимальная задержка30–60 с (зависит от SLA)
Количество попытокдо 50–100 (настраивается)

Когда использовать polling: публичные API, нет возможности зарегистрировать callback, клиент — простой скрипт.


3.2 Webhook (callback)

Клиент при создании задачи передаёт URL, на который сервер отправит POST-запрос с результатом.

  • Преимущества: мгновенное уведомление, не нужно опрашивать.
  • Требования: клиент должен иметь публичный endpoint, принимать POST, отвечать 200 OK.
  • Retry-механизм: сервер повторяет отправку 3–5 раз с exponential backoff, если клиент не ответил 200.

Пример запроса создания задачи с webhook:

POST /tasks
{
  "prompt": "Напиши статью о ML...",
  "webhook_url": "https://my-service.com/callback",
  "max_retries": 3
}

Когда использовать webhook: сервер-серверное взаимодействие, микросервисы, где клиент может принимать входящие запросы.


4. Очередь сообщений и воркеры

4.1 Выбор очереди

ТехнологияПлюсыМинусы
Celery + Redis/RabbitMQВстроенная поддержка задач, retry, ETA, мониторинг (Flower)Тяжеловат для простых сценариев, нужен брокер
KafkaВысокая пропускная способность, долгое хранение сообщений, партиционированиеСложнее настройка, не встроен scheduler
Redis StreamsЛёгкий, не требует внешнего брокера, встроен TTLМеньше гарантий, чем Kafka
Amazon SQS / Google Pub/SubСерверный, масштабируемый, плата за использованиеVendor lock, latency

Для LLM-задач часто используют Celery (простота) или Kafka (если много воркеров и нужна гарантия at-least-once).

4.2 Пример воркера на Celery

from celery import Celery
import openai

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=10)
def generate_report(self, prompt: str):
    try:
        result = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            timeout=60
        )
        return result.choices[0].message.content
    except Exception as e:
        raise self.retry(exc=e)

4.3 Как API создаёт задачу (псевдокод)

from uuid import uuid4
from flask import Flask, request, jsonify
import redis
from celery import Celery

app = Flask(__name__)
r = redis.Redis()
celery = Celery(...)

@app.route('/tasks', methods=['POST'])
def create_task():
    task_id = str(uuid4())
    prompt = request.json['prompt']
    webhook_url = request.json.get('webhook_url')
    
    # сохраняем начальный статус
    r.hset(f"task:{task_id}", "status", "pending")
    r.hset(f"task:{task_id}", "prompt", prompt)
    if webhook_url:
        r.hset(f"task:{task_id}", "webhook", webhook_url)
    r.expire(f"task:{task_id}", 86400)  # TTL 24h
    
    # отправляем в Celery
    generate_report.delay(task_id, prompt)
    
    return jsonify({"task_id": task_id, "status": "pending"}), 202

@app.route('/tasks/<task_id>', methods=['GET'])
def get_task(task_id):
    data = r.hgetall(f"task:{task_id}")
    if not data:
        return jsonify({"error": "not found"}), 404
    return jsonify(data)

5. Хранение результата и TTL

Результат храним в Redis (или другом быстром хранилище) с TTL (Time-To-Live) — обычно 24 часа. По истечении TTL задача и результат удаляются.

  • Почему 24 часа: клиент скорее всего заберёт результат в течение нескольких минут после завершения, но запас нужен на случай задержек.
  • Альтернатива: хранить в реляционной БД с флагом expired и чистить кроном.

6. Обработка ошибок и устаревших задач

  • Retry-логика в воркере: при временных ошибках LLM (rate limit, network) повторять с задержкой.
  • Dead Letter Queue (DLQ): если задача провалилась после N попыток, отправляем в специальную очередь для ручного анализа.
  • Таймаут задачи: если воркер завис, мониторинг (например, Celery TaskRevokedError) должен отменить задачу и установить статус failed.
  • Очистка старых задач: Redis TTL автоматически удалит; для БД — cron-скрипт раз в час.

7. Масштабирование

  • Горизонтальное масштабирование воркеров: добавляем больше инстансов, очередь распределяет задачи (особенно хорошо с Kafka / партициями).
  • Пул LLM-ключей: если используем API с rate limit, пул ключей (key rotation) позволит утилизировать лимит.
  • Приоритизация задач: в очереди можно задать priority (если Celery — использует разные очереди или таски с приоритетом).

8. Безопасность и идемпотентность

  • Идемпотентность: клиент может повторно отправить один и тот же запрос (например, из‑за сетевой ошибки). Решение: клиент передаёт idempotency key, сервер проверяет, выполнялась ли задача с таким ключом, и возвращает существующий task_id.
  • Аутентификация: все эндпоинты защищены (API key, OAuth), воркеры не должны быть доступны извне.
  • Webhook: проверять подпись (HMAC) на стороне клиента, чтобы удостовериться, что callback пришёл от сервера.

9. Мониторинг

  • Метрики:
    • Задержка обработки (p95, p99)
    • Количество задач в очереди (backlog)
    • Количество успехов / ошибок в единицу времени
    • Время LLM-вызова (latency LLM)
  • Трейсинг (OpenTelemetry): прослеживаем путь от создания задачи до завершения.
  • Логирование: логи воркеров с correlation_id = task_id.

Пет-проект для закрепления

Задача: Реализовать минимальный асинхронный LLM сервис, который принимает текстовый запрос, отправляет его в очередь, обрабатывает через OpenAI (тестовый ключ) и возвращает результат через polling.

Инструменты: Python (Flask/FastAPI), Celery, Redis, OpenAI (или любой бесплатный LLM endpoint).

Шаги:

  1. Установите celery[redis], flask, openai, redis-py.
  2. Напишите Celery-задачу generate_answer, которая принимает prompt и ключ авторизации, делает вызов LLM и возвращает текст.
  3. Создайте Flask-эндпоинты:
    • POST /tasks — создаёт задачу, сохраняет task_id и статус pending в Redis, вызывает generate_answer.delay, возвращает 202.
    • GET /tasks/<task_id> — возвращает статус и, если completed, результат.
  4. Запустите Redis, Celery worker (celery -A tasks worker --loglevel=info), Flask-приложение.
  5. Напишите клиентский скрипт, который отправляет запрос и ждёт ответа с exponential backoff (макс. 3 попытки).
  6. Добавьте TTL 1 час на результат в Redis.

Ожидаемый результат: клиент успешно получает сгенерированный текст через 10–30 секунд без блокировки основного потока.


Связь с другими вопросами

ВопросТема
415Как проектировать агентный RAG с длительными шагами
418Обработка ошибок и fallback стратегии
422Мониторинг и логирование в Agentic RAG
205Очереди сообщений в high-load ML системах
108Организация batch-инференса LLM через очереди
350Шаблон Saga и распределённые транзакции в AI

Навигация