Как проектировать Airflow DAG для RAG ingestion?

Краткий тезис

Проектирование Airflow DAG для пайплайна RAG ingestion — это построение надёжного, масштабируемого и воспроизводимого графа задач, который превращает сырые документы в векторные представления, готовые к retrieval по запросам пользователей. Ключевые этапы: извлечение (extract), парсинг (parse), чанкинг (chunk), генерация эмбеддингов (embed) и загрузка в векторную базу данных (insert). Грамотный DAG должен поддерживать инкрементальные обновления, обработку ошибок, мониторинг SLA и параллельное выполнение тяжёлых задач (например, на Spark).


1. Общая архитектура RAG ingestion pipeline

RAG ingestion — это ETL-процесс (extract, transform, load), который подготавливает корпус документов для поиска. Основные этапы:

  1. Extract — загрузка сырых файлов из источников: S3, API, реляционной базы данных, Google Drive и т.д.
  2. Parse — преобразование форматов (PDF, HTML, Markdown, DOCX) в чистый текст.
  3. Chunk — разбиение длинного текста на небольшие фрагменты (чанки) заданной стратегии (фиксированный размер, chunking|семантическое разбиение).
  4. Embed — преобразование чанков в векторные эмбеддинги с помощью нейросетевой модели (например, text-embedding-ada-002, sentence-transformers).
  5. Insert — сохранение пар (текст чанка + эмбеддинг) в векторную базу данных (Qdrant, Pinecone, Weaviate, Milvus).

Роль Airflow — оркестровать эти шаги: запускать их в нужном порядке, повторять при ошибках, следить за сроками и предоставлять историю выполнения.

Термины

  • DAG (Directed Acyclic Graph) — направленный ациклический граф задач (tasks), где ребра указывают порядок выполнения.
  • Operator — шаблон задачи (например, PythonOperator для вызова Python-функции, SparkSubmitOperator для отправки Spark-приложения).
  • Task — конкретный экземпляр оператора внутри DAG.
  • Pool — ограничение параллелизма для группы задач (например, не более 2 задач с GPU одновременно).
  • SLA (Service Level Agreement) — ожидаемое время выполнения задачи; если превышено, Airflow отправляет оповещение.

2. Проектирование DAG: частота, зависимости и параллелизм

При проектировании DAG для RAG ingestion нужно ответить на вопросы:

  • Как часто обновлять индекс

  • Какие задачи можно выполнять параллельно

    • Парсинг и чанкинг — CPU-bound, можно параллелить по файлам с помощью Spark или Airflow TaskGroups.
    • Генерация эмбеддингов — GPU-bound, требует контроля параллелизма (Pool).
    • Вставка в векторную БД — I/O-bound, можно батчами.
  • Какой порядок обязателен

    • Нельзя начать парсить, пока не извлечены файлы.
    • Нельзя эмбеддить, пока нет текстовых чанков.
    • Вставка выполняется после получения эмбеддингов.

Пример графа зависимостей:

extract >> parse >> chunk >> embed >> insert

Однако часто между парсингом и чанкингом может быть промежуточная запись в объектное хранилище (S3) для отказоустойчивости. Также можно разбить embed на несколько параллельных задач по батчам чанков.


3. Компоненты DAG: подробный разбор

3.1 Extract (извлечение)

Operator: PythonOperator (или S3KeySensor для ожидания новых файлов).

Задача: скачать файлы из источника, сохранить в локальную временную папку или в S3. Важно:

  • Фильтровать только новые/изменённые файлы (по timestamp, хешу).
  • Для большого количества файлов использовать потоковую загрузку или Spark.

Термин Sensor — оператор, который ожидает наступления условия (например, появления файла в S3). Используется для триггера инкрементальной загрузки.

3.2 Parse (парсинг)

Operator: SparkSubmitOperator (или PythonOperator с библиотеками pdfplumber, beautifulsoup4, markdownify).

Термин SparkSubmitOperator — отправляет Spark-приложение на кластер. Spark позволяет параллельно обрабатывать много файлов, что критично для корпусов >10k документов.

Результат парсинга — чистый текст, который сохраняется в виде .txt файлов или в колонке DataFrame.

3.3 Chunk (чанкинг)

Operator: SparkSubmitOperator или PythonOperator.

Стратегии чанкинга (выбор влияет на качество retrieval):

СтратегияОписаниеПример
Fixed-sizeФиксированное число токенов (256, 512) с перекрытием (overlap)LangChain RecursiveCharacterTextSplitter
SemanticРазделение по границам предложений/абзацев с использованием эмбеддинговSemanticChunker из LangChain
AgenticLLM-агент решает, где разбить, основываясь на содержанииЭкспериментально

Термин Overlap — перекрытие между соседними чанками (например, 15% размера чанка) для сохранения контекста на границах.

3.4 Embed (генерация эмбеддингов)

Operator: PythonOperator с загрузкой модели на GPU, или SparkSubmitOperator с конфигурацией для GPU.

Проблема: генерация эмбеддингов — самая дорогая стадия. Решения:

  • Использовать батчирование чанков.
  • Выделять отдельный Pool gpu_pool с максимальным числом параллельных задач (например, 4).
  • Применять кеширование эмбеддингов для уже обработанных чанков (по хешу текста).

Код для PythonOperator (упрощённо):

def generate_embeddings(chunks: List[str]) -> List[float]:
    model = SentenceTransformer('intfloat/multilingual-e5-large', device='cuda')
    embeddings = model.encode(chunks, batch_size=32, show_progress_bar=True)
    return embeddings.tolist()

3.5 Insert (загрузка в векторную БД)

Operator: PythonOperator с клиентом Qdrant / Pinecone.

Особенности:

  • Использовать batch insert (например, 100 векторов за раз).
  • Убедиться, что индекс существует (upsert с автоматическим созданием коллекции).
  • Настроить idempotent поведение: при повторном запуске не дублировать данные (использовать уникальный ID чанка).

Пример:

def insert_vectors_to_qdrant(chunk_ids, vectors, payloads):
    client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
    client.upsert(
        collection_name="my_collection",
        points=[
            models.PointStruct(id=chunk_id, vector=vector, payload=payload)
            for chunk_id, vector, payload in zip(chunk_ids, vectors, payloads)
        ]
    )

4. Обработка ошибок и повторные попытки

Retries: для задач, подверженных временным ошибкам (S3 timeout, GPU OOM), настраиваем retries и retry_delay.

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'sla': timedelta(hours=1),
}

Alerting: при падении DAG отправлять уведомление в Slack/Telegram через on_failure_callback.

Dead-letter: чанки, которые не удалось обработать (ошибка парсинга, невалидный текст), следует сохранять в отдельную таблицу для ручной инспекции.


5. Инкрементальная загрузка (incremental ingestion)

Чтобы не пересчитывать весь корпус при каждом запуске, используем checkpointing.

Варианты:

  1. Timestamp-based: после успешного DAG сохраняем last_processed_timestamp в переменную Airflow (Variable.set). На следующем запуске вычитываем только файлы, изменённые после этой даты.
  2. Hash-based: вычисляем хеш содержимого файла и сравниваем с ранее сохранённым.
  3. Database trigger: если документы хранятся в SQL, используем поле updated_at и инкрементальный SQL-запрос.

Пример с Variable:

from airflow.models import Variable

def extract_incremental():
    last_run = Variable.get("last_ingestion_timestamp", default_var="1970-01-01")
    # Получаем список файлов новее last_run
    new_files = list_new_files(since=last_run)
    return new_files

После успешной загрузки обновляем last_ingestion_timestamp на текущее время.


6. Масштабирование

Для больших корпусов (миллионы документов) Airflow может стать узким местом. Рекомендации:

  • Использовать Spark для парсинга и чанкинга — распределённая обработка на N ядрах.
  • Разделить embed на параллельные задачи по батчам (Airflow TaskGroup или Dynamic Task Mapping).
  • Ограничить параллелизм на GPU с помощью Pool: gpu_pool с slot_count=number_of_gpus.
  • Отключить отладочные логи для задач, где они не нужны, чтобы не перегружать БД Airflow.

7. Мониторинг и логирование

  • XCom — передача небольших данных между задачами (например, список идентификаторов чанков).
  • SLAs — если DAG не укладывается в отведённое время (например, ingestion не должен длиться дольше часа), Airflow отправляет предупреждение.
  • Метрики: экспортируйте в Prometheus/Grafana количество обработанных документов, время выполнения каждой стадии, количество ошибок.
  • Даг-ран — каждый запуск DAG имеет уникальный run_id, который можно записать в метаданные чанка для отслеживания.

8. Пример полного DAG (расширенный)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.models import Variable

default_args = {
    'owner': 'ml-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
    'sla': timedelta(hours=2),
}

with DAG(
    'rag_ingestion_v2',
    default_args=default_args,
    schedule_interval='@hourly',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['rag', 'ingestion'],
) as dag:

    # Sensor: ждём появления новых файлов в S3 (опционально)
    wait_for_files = S3KeySensor(
        task_id='wait_for_new_files',
        bucket_name='my-docs-bucket',
        bucket_key='incoming/',
        wildcard_match=True,
        poke_interval=30,
        timeout=60 * 10,
    )

    # Извлечение списка новых файлов на основе checkpoint
    def extract_new_files(**context):
        last_ts = Variable.get('ingestion_last_ts', default_var='1970-01-01T00:00:00')
        # Логика получения файлов из S3 новее last_ts
        new_files = s3_list_new_files(bucket='my-docs-bucket', prefix='incoming/', since=last_ts)
        # Пушим через XCom
        context['ti'].xcom_push(key='new_files', value=new_files)

    extract = PythonOperator(task_id='extract_new_files', python_callable=extract_new_files)

    # Парсинг PDF через Spark
    parse = SparkSubmitOperator(
        task_id='parse_pdfs',
        application='/opt/spark_apps/parse_pdfs.py',
        name='ParsePDFs',
        conn_id='spark_default',
        application_args=['{{ ti.xcom_pull(task_ids="extract_new_files", key="new_files") }}'],
    )

    # Чанкинг через Spark
    chunk = SparkSubmitOperator(
        task_id='chunk_documents',
        application='/opt/spark_apps/chunk.py',
        name='ChunkDocs',
        conn_id='spark_default',
    )

    # Генерация эмбеддингов на GPU (pool ограничивает параллелизм)
    def embed_batch(**context):
        chunks = context['ti'].xcom_pull(task_ids='chunk_documents', key='chunks')
        # Загружаем модель один раз при старте задачи
        import sentence_transformers
        model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2', device='cuda')
        embeddings = model.encode(chunks, batch_size=64)
        context['ti'].xcom_push(key='embeddings', value=embeddings.tolist())
        context['ti'].xcom_push(key='chunk_ids', value=[c.id for c in chunks])

    embed = PythonOperator(
        task_id='generate_embeddings',
        python_callable=embed_batch,
        pool='gpu_pool',
    )

    # Вставка в Qdrant
    def insert_vectors(**context):
        chunk_ids = context['ti'].xcom_pull(task_ids='generate_embeddings', key='chunk_ids')
        embeddings = context['ti'].xcom_pull(task_ids='generate_embeddings', key='embeddings')
        # логика вставки
        client = QdrantClient(url=Variable.get('qdrant_url'))
        client.upsert(collection_name='docs', points=[
            models.PointStruct(id=cid, vector=emb, payload={'text': '...'})
            for cid, emb in zip(chunk_ids, embeddings)
        ])
        # Обновляем checkpoint
        now = datetime.utcnow().isoformat()
        Variable.set('ingestion_last_ts', now)

    insert = PythonOperator(task_id='insert_to_qdrant', python_callable=insert_vectors)

    # Порядок
    wait_for_files >> extract >> parse >> chunk >> embed >> insert

9. Альтернативы Airflow и их сравнение

ИнструментПлюсы для RAG ingestionМинусы
AirflowЗрелый, широко распространён, много интеграций (Spark, S3, Qdrant), гибкий DAGСложность настройки, не «dataflow» (нет data lineage из коробки)
PrefectБолее современный API, автоматическая обработка ошибок, простота деплояМеньше сообщество, меньше готовых операторов
DagsterСильный упор на data assets, автоматическая материализация, хорошая observabilityКрутая кривая обучения, меньше операторов для Spark
Kubeflow PipelinesНативный для Kubernetes, хорош для ML-пайплайнов (включая обучение моделей)Требует K8s, избыточен для простого ingestion

Выбор зависит от контекста: если в компании уже есть Airflow — используйте его; если строите новую платформу с упором на ML — рассмотрите Dagster или Prefect.


10. Best practices для RAG ingestion

  1. Версионируйте эмбеддинги и индекс — при смене модели эмбеддингов создавайте новую коллекцию (например, docs_v1, docs_v2). Это позволяет A/B тестировать качество retrieval и откатываться.
  2. Используйте уникальные ID для чанков — комбинация document_id + chunk_index (или UUID). Это обеспечивает идемпотентность upsert.
  3. Чанкинг не должен разрывать предложения — используйте стратегии с учётом языка (для русского, например, разделение по знакам препинания).
  4. Храните метаданные чанка — источник, дата, название документа, URL. Это пригодится для фильтрации и ответов LLM.
  5. Мониторьте latency эмбеддингов — если задача embed занимает >1 часа, дробите её на подзадачи.
  6. Добавьте задачу очистки — например, purge_old_indexes раз в неделю.

Пет-проект для закрепления

Задача: Реализовать Airflow DAG для инкрементальной индексации набора PDF-документов (≈1000 шт.) с использованием Spark для чанкинга и GPU-стека для эмбеддингов (или CPU для малого количества). Векторная БД — Qdrant (локально в Docker).

Инструменты:

Шаги:

  1. Развернуть Airflow и Qdrant в Docker Compose.
  2. Написать Spark-приложения: parse_pdfs.py (извлечение текста с PyMuPDF), chunk.py (разбиение на чанки по 512 токенов с overlap 128).
  3. Написать PythonOperator для загрузки эмбеддингов (если нет GPU, используйте CPU-модель, установите pool для ограничения).
  4. Написать оператор для upsert в Qdrant с использованием qdrant-client.
  5. Реализовать инкрементальную загрузку: сохранять в Variable последний timestamp успешного запуска.
  6. Добавить даг с schedule_interval='@daily' и протестировать на 50 PDF.

Ожидаемый результат: Работающий DAG, который при каждом запуске подхватывает только новые PDF, обрабатывает их и добавляет векторы в Qdrant. В Airflow UI можно увидеть историю успешных/упавших задач, время выполнения и SLA.


Связь с другими вопросами

ВопросТема
1Как бы вы спроектировали RAG-систему для 10 000 документов с разной структурой?
3Какие стратегии chunking'а вы знаете и когда какую применяете?
5Как вы оцениваете качество retrieval'а в RAG-системе?
7Как вы уменьшаете latency RAG-системы (время ответа)?
9Как вы обновляете документы в существующей RAG-системе?

Навигация