中文翻译暂不可用,显示俄语原文。
Как вы делаете асинхронную обработку long-running (>30s) LLM задач?
Краткий тезис
Для LLM-задач, выполняющихся дольше 30 секунд (например, генерация отчёта, batch-обработка документов, сложный multi-step reasoning), синхронное ожидание ответа неприемлемо — это блокирует клиента и нарушает UX. Решение — асинхронная обработка через шаблон Async Task: сервер сразу возвращает HTTP 202 Accepted с task_id, а клиент либо опрашивает endpoint (GET /tasks/{id}) с exponential backoff, либо получает результат через webhook. В фоне работают очередь (Kafka / Celery) и воркеры, результат хранится в Redis с TTL. Такой подход позволяет масштабировать обработку, не роняя API.
1. Проблема long-running LLM задач
Long-running задача — любой вызов LLM, занимающий более ~30 секунд. Типичные сценарии:
- Генерация длинного документа (10+ страниц) с помощью RAG + summarization.
- Multi-step reasoning (ReAct, Plan-and-Execute) с несколькими итерациями инструментов.
- Batch-транскрибация аудио (Whisper + перевод).
- Обработка большого набора чанков (например, 500+ фрагментов) через LLM.
Если держать HTTP-соединение открытым, клиент:
- рискует получить тайм-аут по timeout (обычно 30–60 сек);
- тратит ресурсы на ожидание;
- не может продолжить работу (разве что при многопоточности).
Решение — асинхронный task-based паттерн: запрос = задача (task), статус и результат получаются отдельно.
2. Базовая архитектура асинхронной обработки
Client ──POST /tasks (LLM prompt)──→ API
│
├→ task_id (202 Accepted)
│
┌───── очередь (Kafka/Celery/Redis Streams) ───► Worker 1
│ │
│ ├→ LLM call (40s)
│ │
│ └→ Write result→ Redis (TTL 24h)
│
└── Изменение статуса: pending → processing → completed/failed
- API Gateway — принимает запрос, валидирует, генерирует UUID, сохраняет начальный статус в Redis, отправляет задачу в очередь, возвращает
task_id. - Очередь сообщений — гарантирует доставку, порядок (необязательно), позволяет масштабировать воркеры.
- Worker (воркер) — подписчик очереди, выполняет LLM-вызов, записывает результат в persistent store (Redis с TTL или база данных).
- Хранилище результатов — обычно Redis с TTL 24 часа (или БД для долгого хранения). Клиент забирает результат по
GET /tasks/{id}.
3. Варианты получения результата клиентом
3.1 Polling (опрос) с exponential backoff
Клиент периодически вызывает GET /tasks/{id} и проверяет статус.
- Статусы:
pending,processing,completed,failed. - Exponential backoff: начинать с 1 сек, удваивать до максимума (например, 30 сек), чтобы не перегружать сервер.
import time
import requests
def poll_task(base_url, task_id, max_attempts=20):
for i in range(max_attempts):
resp = requests.get(f"{base_url}/tasks/{task_id}")
data = resp.json()
if data["status"] == "completed":
return data["result"]
if data["status"] == "failed":
raise Exception(f"Task failed: {data['error']}")
wait = min(2 ** i, 30) # exponential backoff, cap 30s
time.sleep(wait)
raise TimeoutError("Task did not finish")
| Параметр | Рекомендация |
|---|---|
| Начальная задержка | 0.5–1 с |
| Фактор роста | 2 (экспонента) |
| Максимальная задержка | 30–60 с (зависит от SLA) |
| Количество попыток | до 50–100 (настраивается) |
Когда использовать polling: публичные API, нет возможности зарегистрировать callback, клиент — простой скрипт.
3.2 Webhook (callback)
Клиент при создании задачи передаёт URL, на который сервер отправит POST-запрос с результатом.
- Преимущества: мгновенное уведомление, не нужно опрашивать.
- Требования: клиент должен иметь публичный endpoint, принимать POST, отвечать 200 OK.
- Retry-механизм: сервер повторяет отправку 3–5 раз с exponential backoff, если клиент не ответил 200.
Пример запроса создания задачи с webhook:
POST /tasks
{
"prompt": "Напиши статью о ML...",
"webhook_url": "https://my-service.com/callback",
"max_retries": 3
}
Когда использовать webhook: сервер-серверное взаимодействие, микросервисы, где клиент может принимать входящие запросы.
4. Очередь сообщений и воркеры
4.1 Выбор очереди
| Технология | Плюсы | Минусы |
|---|---|---|
| Celery + Redis/RabbitMQ | Встроенная поддержка задач, retry, ETA, мониторинг (Flower) | Тяжеловат для простых сценариев, нужен брокер |
| Kafka | Высокая пропускная способность, долгое хранение сообщений, партиционирование | Сложнее настройка, не встроен scheduler |
| Redis Streams | Лёгкий, не требует внешнего брокера, встроен TTL | Меньше гарантий, чем Kafka |
| Amazon SQS / Google Pub/Sub | Серверный, масштабируемый, плата за использование | Vendor lock, latency |
Для LLM-задач часто используют Celery (простота) или Kafka (если много воркеров и нужна гарантия at-least-once).
4.2 Пример воркера на Celery
from celery import Celery
import openai
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=10)
def generate_report(self, prompt: str):
try:
result = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=60
)
return result.choices[0].message.content
except Exception as e:
raise self.retry(exc=e)
4.3 Как API создаёт задачу (псевдокод)
from uuid import uuid4
from flask import Flask, request, jsonify
import redis
from celery import Celery
app = Flask(__name__)
r = redis.Redis()
celery = Celery(...)
@app.route('/tasks', methods=['POST'])
def create_task():
task_id = str(uuid4())
prompt = request.json['prompt']
webhook_url = request.json.get('webhook_url')
# сохраняем начальный статус
r.hset(f"task:{task_id}", "status", "pending")
r.hset(f"task:{task_id}", "prompt", prompt)
if webhook_url:
r.hset(f"task:{task_id}", "webhook", webhook_url)
r.expire(f"task:{task_id}", 86400) # TTL 24h
# отправляем в Celery
generate_report.delay(task_id, prompt)
return jsonify({"task_id": task_id, "status": "pending"}), 202
@app.route('/tasks/<task_id>', methods=['GET'])
def get_task(task_id):
data = r.hgetall(f"task:{task_id}")
if not data:
return jsonify({"error": "not found"}), 404
return jsonify(data)
5. Хранение результата и TTL
Результат храним в Redis (или другом быстром хранилище) с TTL (Time-To-Live) — обычно 24 часа. По истечении TTL задача и результат удаляются.
- Почему 24 часа: клиент скорее всего заберёт результат в течение нескольких минут после завершения, но запас нужен на случай задержек.
- Альтернатива: хранить в реляционной БД с флагом
expiredи чистить кроном.
6. Обработка ошибок и устаревших задач
- Retry-логика в воркере: при временных ошибках LLM (rate limit, network) повторять с задержкой.
- Dead Letter Queue (DLQ): если задача провалилась после N попыток, отправляем в специальную очередь для ручного анализа.
- Таймаут задачи: если воркер завис, мониторинг (например, Celery
TaskRevokedError) должен отменить задачу и установить статусfailed. - Очистка старых задач: Redis TTL автоматически удалит; для БД — cron-скрипт раз в час.
7. Масштабирование
- Горизонтальное масштабирование воркеров: добавляем больше инстансов, очередь распределяет задачи (особенно хорошо с Kafka / партициями).
- Пул LLM-ключей: если используем API с rate limit, пул ключей (key rotation) позволит утилизировать лимит.
- Приоритизация задач: в очереди можно задать priority (если Celery — использует разные очереди или таски с приоритетом).
8. Безопасность и идемпотентность
- Идемпотентность: клиент может повторно отправить один и тот же запрос (например, из‑за сетевой ошибки). Решение: клиент передаёт idempotency key, сервер проверяет, выполнялась ли задача с таким ключом, и возвращает существующий
task_id. - Аутентификация: все эндпоинты защищены (API key, OAuth), воркеры не должны быть доступны извне.
- Webhook: проверять подпись (HMAC) на стороне клиента, чтобы удостовериться, что callback пришёл от сервера.
9. Мониторинг
- Метрики:
- Трейсинг (OpenTelemetry): прослеживаем путь от создания задачи до завершения.
- Логирование: логи воркеров с correlation_id = task_id.
Пет-проект для закрепления
Задача: Реализовать минимальный асинхронный LLM сервис, который принимает текстовый запрос, отправляет его в очередь, обрабатывает через OpenAI (тестовый ключ) и возвращает результат через polling.
Инструменты: Python (Flask/FastAPI), Celery, Redis, OpenAI (или любой бесплатный LLM endpoint).
Шаги:
- Установите
celery[redis],flask,openai,redis-py. - Напишите Celery-задачу
generate_answer, которая принимает prompt и ключ авторизации, делает вызов LLM и возвращает текст. - Создайте Flask-эндпоинты:
POST /tasks— создаёт задачу, сохраняетtask_idи статусpendingв Redis, вызываетgenerate_answer.delay, возвращает 202.GET /tasks/<task_id>— возвращает статус и, еслиcompleted, результат.
- Запустите Redis, Celery worker (
celery -A tasks worker --loglevel=info), Flask-приложение. - Напишите клиентский скрипт, который отправляет запрос и ждёт ответа с exponential backoff (макс. 3 попытки).
- Добавьте TTL 1 час на результат в Redis.
Ожидаемый результат: клиент успешно получает сгенерированный текст через 10–30 секунд без блокировки основного потока.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 415 | Как проектировать агентный RAG с длительными шагами |
| 418 | Обработка ошибок и fallback стратегии |
| 422 | Мониторинг и логирование в Agentic RAG |
| 205 | Очереди сообщений в high-load ML системах |
| 108 | Организация batch-инференса LLM через очереди |
| 350 | Шаблон Saga и распределённые транзакции в AI |
Навигация
- Предыдущий: 419
- Следующий: 421
- Индекс: 00. Индекс разборов