Как проектировать Airflow DAG для RAG ingestion?
Краткий тезис
Проектирование Airflow DAG для пайплайна RAG ingestion — это построение надёжного, масштабируемого и воспроизводимого графа задач, который превращает сырые документы в векторные представления, готовые к retrieval по запросам пользователей. Ключевые этапы: извлечение (extract), парсинг (parse), чанкинг (chunk), генерация эмбеддингов (embed) и загрузка в векторную базу данных (insert). Грамотный DAG должен поддерживать инкрементальные обновления, обработку ошибок, мониторинг SLA и параллельное выполнение тяжёлых задач (например, на Spark).
1. Общая архитектура RAG ingestion pipeline
RAG ingestion — это ETL-процесс (extract, transform, load), который подготавливает корпус документов для поиска. Основные этапы:
- Extract — загрузка сырых файлов из источников: S3, API, реляционной базы данных, Google Drive и т.д.
- Parse — преобразование форматов (PDF, HTML, Markdown, DOCX) в чистый текст.
- Chunk — разбиение длинного текста на небольшие фрагменты (чанки) заданной стратегии (фиксированный размер, chunking|семантическое разбиение).
- Embed — преобразование чанков в векторные эмбеддинги с помощью нейросетевой модели (например, text-embedding-ada-002, sentence-transformers).
- Insert — сохранение пар (текст чанка + эмбеддинг) в векторную базу данных (Qdrant, Pinecone, Weaviate, Milvus).
Роль Airflow — оркестровать эти шаги: запускать их в нужном порядке, повторять при ошибках, следить за сроками и предоставлять историю выполнения.
Термины
- DAG (Directed Acyclic Graph) — направленный ациклический граф задач (tasks), где ребра указывают порядок выполнения.
- Operator — шаблон задачи (например,
PythonOperatorдля вызова Python-функции, SparkSubmitOperator для отправки Spark-приложения). - Task — конкретный экземпляр оператора внутри DAG.
- Pool — ограничение параллелизма для группы задач (например, не более 2 задач с GPU одновременно).
- SLA (Service Level Agreement) — ожидаемое время выполнения задачи; если превышено, Airflow отправляет оповещение.
2. Проектирование DAG: частота, зависимости и параллелизм
При проектировании DAG для RAG ingestion нужно ответить на вопросы:
-
Как часто обновлять индекс
- schedule_interval='@hourly' для часто меняющихся данных.
- schedule_interval='@daily' для статичного корпуса.
- schedule_interval=None для ручного запуска по триггеру (например, webhook).
-
Какие задачи можно выполнять параллельно
-
Какой порядок обязателен
- Нельзя начать парсить, пока не извлечены файлы.
- Нельзя эмбеддить, пока нет текстовых чанков.
- Вставка выполняется после получения эмбеддингов.
Пример графа зависимостей:
extract >> parse >> chunk >> embed >> insert
Однако часто между парсингом и чанкингом может быть промежуточная запись в объектное хранилище (S3) для отказоустойчивости. Также можно разбить embed на несколько параллельных задач по батчам чанков.
3. Компоненты DAG: подробный разбор
3.1 Extract (извлечение)
Operator: PythonOperator (или S3KeySensor для ожидания новых файлов).
Задача: скачать файлы из источника, сохранить в локальную временную папку или в S3. Важно:
- Фильтровать только новые/изменённые файлы (по timestamp, хешу).
- Для большого количества файлов использовать потоковую загрузку или Spark.
Термин Sensor — оператор, который ожидает наступления условия (например, появления файла в S3). Используется для триггера инкрементальной загрузки.
3.2 Parse (парсинг)
Operator: SparkSubmitOperator (или PythonOperator с библиотеками pdfplumber, beautifulsoup4, markdownify).
Термин SparkSubmitOperator — отправляет Spark-приложение на кластер. Spark позволяет параллельно обрабатывать много файлов, что критично для корпусов >10k документов.
Результат парсинга — чистый текст, который сохраняется в виде .txt файлов или в колонке DataFrame.
3.3 Chunk (чанкинг)
Operator: SparkSubmitOperator или PythonOperator.
Стратегии чанкинга (выбор влияет на качество retrieval):
| Стратегия | Описание | Пример |
|---|---|---|
| Fixed-size | Фиксированное число токенов (256, 512) с перекрытием (overlap) | LangChain RecursiveCharacterTextSplitter |
| Semantic | Разделение по границам предложений/абзацев с использованием эмбеддингов | SemanticChunker из LangChain |
| Agentic | LLM-агент решает, где разбить, основываясь на содержании | Экспериментально |
Термин Overlap — перекрытие между соседними чанками (например, 15% размера чанка) для сохранения контекста на границах.
3.4 Embed (генерация эмбеддингов)
Operator: PythonOperator с загрузкой модели на GPU, или SparkSubmitOperator с конфигурацией для GPU.
Проблема: генерация эмбеддингов — самая дорогая стадия. Решения:
- Использовать батчирование чанков.
- Выделять отдельный Pool
gpu_poolс максимальным числом параллельных задач (например, 4). - Применять кеширование эмбеддингов для уже обработанных чанков (по хешу текста).
Код для PythonOperator (упрощённо):
def generate_embeddings(chunks: List[str]) -> List[float]:
model = SentenceTransformer('intfloat/multilingual-e5-large', device='cuda')
embeddings = model.encode(chunks, batch_size=32, show_progress_bar=True)
return embeddings.tolist()
3.5 Insert (загрузка в векторную БД)
Operator: PythonOperator с клиентом Qdrant / Pinecone.
Особенности:
- Использовать batch insert (например, 100 векторов за раз).
- Убедиться, что индекс существует (upsert с автоматическим созданием коллекции).
- Настроить idempotent поведение: при повторном запуске не дублировать данные (использовать уникальный ID чанка).
Пример:
def insert_vectors_to_qdrant(chunk_ids, vectors, payloads):
client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
client.upsert(
collection_name="my_collection",
points=[
models.PointStruct(id=chunk_id, vector=vector, payload=payload)
for chunk_id, vector, payload in zip(chunk_ids, vectors, payloads)
]
)
4. Обработка ошибок и повторные попытки
Retries: для задач, подверженных временным ошибкам (S3 timeout, GPU OOM), настраиваем retries и retry_delay.
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=1),
}
Alerting: при падении DAG отправлять уведомление в Slack/Telegram через on_failure_callback.
Dead-letter: чанки, которые не удалось обработать (ошибка парсинга, невалидный текст), следует сохранять в отдельную таблицу для ручной инспекции.
5. Инкрементальная загрузка (incremental ingestion)
Чтобы не пересчитывать весь корпус при каждом запуске, используем checkpointing.
Варианты:
- Timestamp-based: после успешного DAG сохраняем
last_processed_timestampв переменную Airflow (Variable.set). На следующем запуске вычитываем только файлы, изменённые после этой даты. - Hash-based: вычисляем хеш содержимого файла и сравниваем с ранее сохранённым.
- Database trigger: если документы хранятся в SQL, используем поле
updated_atи инкрементальный SQL-запрос.
Пример с Variable:
from airflow.models import Variable
def extract_incremental():
last_run = Variable.get("last_ingestion_timestamp", default_var="1970-01-01")
# Получаем список файлов новее last_run
new_files = list_new_files(since=last_run)
return new_files
После успешной загрузки обновляем last_ingestion_timestamp на текущее время.
6. Масштабирование
Для больших корпусов (миллионы документов) Airflow может стать узким местом. Рекомендации:
- Использовать Spark для парсинга и чанкинга — распределённая обработка на N ядрах.
- Разделить embed на параллельные задачи по батчам (Airflow TaskGroup или Dynamic Task Mapping).
- Ограничить параллелизм на GPU с помощью Pool:
gpu_poolс slot_count=number_of_gpus. - Отключить отладочные логи для задач, где они не нужны, чтобы не перегружать БД Airflow.
7. Мониторинг и логирование
- XCom — передача небольших данных между задачами (например, список идентификаторов чанков).
- SLAs — если DAG не укладывается в отведённое время (например, ingestion не должен длиться дольше часа), Airflow отправляет предупреждение.
- Метрики: экспортируйте в Prometheus/Grafana количество обработанных документов, время выполнения каждой стадии, количество ошибок.
- Даг-ран — каждый запуск DAG имеет уникальный
run_id, который можно записать в метаданные чанка для отслеживания.
8. Пример полного DAG (расширенный)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.models import Variable
default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=2),
'sla': timedelta(hours=2),
}
with DAG(
'rag_ingestion_v2',
default_args=default_args,
schedule_interval='@hourly',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['rag', 'ingestion'],
) as dag:
# Sensor: ждём появления новых файлов в S3 (опционально)
wait_for_files = S3KeySensor(
task_id='wait_for_new_files',
bucket_name='my-docs-bucket',
bucket_key='incoming/',
wildcard_match=True,
poke_interval=30,
timeout=60 * 10,
)
# Извлечение списка новых файлов на основе checkpoint
def extract_new_files(**context):
last_ts = Variable.get('ingestion_last_ts', default_var='1970-01-01T00:00:00')
# Логика получения файлов из S3 новее last_ts
new_files = s3_list_new_files(bucket='my-docs-bucket', prefix='incoming/', since=last_ts)
# Пушим через XCom
context['ti'].xcom_push(key='new_files', value=new_files)
extract = PythonOperator(task_id='extract_new_files', python_callable=extract_new_files)
# Парсинг PDF через Spark
parse = SparkSubmitOperator(
task_id='parse_pdfs',
application='/opt/spark_apps/parse_pdfs.py',
name='ParsePDFs',
conn_id='spark_default',
application_args=['{{ ti.xcom_pull(task_ids="extract_new_files", key="new_files") }}'],
)
# Чанкинг через Spark
chunk = SparkSubmitOperator(
task_id='chunk_documents',
application='/opt/spark_apps/chunk.py',
name='ChunkDocs',
conn_id='spark_default',
)
# Генерация эмбеддингов на GPU (pool ограничивает параллелизм)
def embed_batch(**context):
chunks = context['ti'].xcom_pull(task_ids='chunk_documents', key='chunks')
# Загружаем модель один раз при старте задачи
import sentence_transformers
model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2', device='cuda')
embeddings = model.encode(chunks, batch_size=64)
context['ti'].xcom_push(key='embeddings', value=embeddings.tolist())
context['ti'].xcom_push(key='chunk_ids', value=[c.id for c in chunks])
embed = PythonOperator(
task_id='generate_embeddings',
python_callable=embed_batch,
pool='gpu_pool',
)
# Вставка в Qdrant
def insert_vectors(**context):
chunk_ids = context['ti'].xcom_pull(task_ids='generate_embeddings', key='chunk_ids')
embeddings = context['ti'].xcom_pull(task_ids='generate_embeddings', key='embeddings')
# логика вставки
client = QdrantClient(url=Variable.get('qdrant_url'))
client.upsert(collection_name='docs', points=[
models.PointStruct(id=cid, vector=emb, payload={'text': '...'})
for cid, emb in zip(chunk_ids, embeddings)
])
# Обновляем checkpoint
now = datetime.utcnow().isoformat()
Variable.set('ingestion_last_ts', now)
insert = PythonOperator(task_id='insert_to_qdrant', python_callable=insert_vectors)
# Порядок
wait_for_files >> extract >> parse >> chunk >> embed >> insert
9. Альтернативы Airflow и их сравнение
| Инструмент | Плюсы для RAG ingestion | Минусы |
|---|---|---|
| Airflow | Зрелый, широко распространён, много интеграций (Spark, S3, Qdrant), гибкий DAG | Сложность настройки, не «dataflow» (нет data lineage из коробки) |
| Prefect | Более современный API, автоматическая обработка ошибок, простота деплоя | Меньше сообщество, меньше готовых операторов |
| Dagster | Сильный упор на data assets, автоматическая материализация, хорошая observability | Крутая кривая обучения, меньше операторов для Spark |
| Kubeflow Pipelines | Нативный для Kubernetes, хорош для ML-пайплайнов (включая обучение моделей) | Требует K8s, избыточен для простого ingestion |
Выбор зависит от контекста: если в компании уже есть Airflow — используйте его; если строите новую платформу с упором на ML — рассмотрите Dagster или Prefect.
10. Best practices для RAG ingestion
- Версионируйте эмбеддинги и индекс — при смене модели эмбеддингов создавайте новую коллекцию (например,
docs_v1,docs_v2). Это позволяет A/B тестировать качество retrieval и откатываться. - Используйте уникальные ID для чанков — комбинация
document_id + chunk_index(или UUID). Это обеспечивает идемпотентность upsert. - Чанкинг не должен разрывать предложения — используйте стратегии с учётом языка (для русского, например, разделение по знакам препинания).
- Храните метаданные чанка — источник, дата, название документа, URL. Это пригодится для фильтрации и ответов LLM.
- Мониторьте latency эмбеддингов — если задача embed занимает >1 часа, дробите её на подзадачи.
- Добавьте задачу очистки — например,
purge_old_indexesраз в неделю.
Пет-проект для закрепления
Задача: Реализовать Airflow DAG для инкрементальной индексации набора PDF-документов (≈1000 шт.) с использованием Spark для чанкинга и GPU-стека для эмбеддингов (или CPU для малого количества). Векторная БД — Qdrant (локально в Docker).
Инструменты:
- Airflow (LocalExecutor)
- Apache Spark (PySpark) — для парсинга и чанкинга
- Sentence-Transformers (
all-MiniLM-L6-v2) - Qdrant (Docker)
- S3/MinIO (для хранения сырых PDF)
Шаги:
- Развернуть Airflow и Qdrant в Docker Compose.
- Написать Spark-приложения:
parse_pdfs.py(извлечение текста с PyMuPDF),chunk.py(разбиение на чанки по 512 токенов с overlap 128). - Написать PythonOperator для загрузки эмбеддингов (если нет GPU, используйте CPU-модель, установите pool для ограничения).
- Написать оператор для upsert в Qdrant с использованием
qdrant-client. - Реализовать инкрементальную загрузку: сохранять в Variable последний timestamp успешного запуска.
- Добавить даг с
schedule_interval='@daily'и протестировать на 50 PDF.
Ожидаемый результат: Работающий DAG, который при каждом запуске подхватывает только новые PDF, обрабатывает их и добавляет векторы в Qdrant. В Airflow UI можно увидеть историю успешных/упавших задач, время выполнения и SLA.
Связь с другими вопросами
| Вопрос | Тема |
|---|---|
| 1 | Как бы вы спроектировали RAG-систему для 10 000 документов с разной структурой? |
| 3 | Какие стратегии chunking'а вы знаете и когда какую применяете? |
| 5 | Как вы оцениваете качество retrieval'а в RAG-системе? |
| 7 | Как вы уменьшаете latency RAG-системы (время ответа)? |
| 9 | Как вы обновляете документы в существующей RAG-системе? |
Навигация
- Предыдущий: 862
- Следующий: 864
- Индекс: 00. Индекс разборов