Airflow schedules ML workflows by treating them as directed acyclic graphs (DAGs) of tasks, allowing for complex dependencies and retries.
Imagine you’ve got a machine learning model that needs to be retrained regularly, say, every night. This retraining involves several steps: fetching new data, preprocessing it, training the model, evaluating its performance, and if it’s good enough, deploying it. Airflow is designed to manage exactly these kinds of multi-step processes.
Here’s a look at a simple Airflow DAG for this ML retraining scenario. This isn’t just conceptual; it’s how you’d actually write it in Python:
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def fetch_data_func():
print("Fetching latest data from S3...")
# Replace with actual data fetching logic
return "data_v1.csv"
def preprocess_data_func(**kwargs):
data_file = kwargs['ti'].xcom_pull(task_ids='fetch_data')
print(f"Preprocessing data from {data_file}...")
# Replace with actual preprocessing logic
return "processed_data_v1.parquet"
def train_model_func(**kwargs):
processed_data = kwargs['ti'].xcom_pull(task_ids='preprocess_data')
print(f"Training model on {processed_data}...")
# Replace with actual model training logic
return "model_v1.pkl"
def evaluate_model_func(**kwargs):
model_file = kwargs['ti'].xcom_pull(task_ids='train_model')
print(f"Evaluating model {model_file}...")
# Replace with actual model evaluation logic
performance = {"accuracy": 0.95}
return performance
def deploy_model_func(**kwargs):
performance = kwargs['ti'].xcom_pull(task_ids='evaluate_model')
if performance["accuracy"] > 0.90:
print(f"Deploying model with accuracy {performance['accuracy']}...")
# Replace with actual deployment logic
else:
print(f"Model performance ({performance['accuracy']}) below threshold. Not deploying.")
with DAG(
dag_id="ml_retraining_pipeline",
schedule="@daily",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["ml", "retraining"],
) as dag:
fetch_data = PythonOperator(
task_id="fetch_data",
python_callable=fetch_data_func,
)
preprocess_data = PythonOperator(
task_id="preprocess_data",
python_callable=preprocess_data_func,
)
train_model = PythonOperator(
task_id="train_model",
python_callable=train_model_func,
)
evaluate_model = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_model_func,
)
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=deploy_model_func,
)
fetch_data >> preprocess_data >> train_model >> evaluate_model >> deploy_model
This DAG defines a sequence of tasks. fetch_data runs first, then preprocess_data, and so on. The >> operator defines this dependency. Each PythonOperator executes a Python function. BashOperator can run shell commands. Airflow manages the execution order, retries on failure, and can even distribute tasks across multiple worker machines.
The core problem Airflow solves in MLOps is orchestration: coordinating complex, multi-stage ML processes that are often brittle and difficult to manage manually. It provides a centralized view and control plane for these workflows, turning a series of scripts into a robust, observable, and repeatable system. You define your ML workflow as code (a DAG), and Airflow handles the scheduling, execution, monitoring, and alerting.
Internally, Airflow has a Scheduler, a Webserver, a Metadata Database, and Workers. The Scheduler monitors DAGs, determines which tasks are ready to run, and sends them to the Executor. The Executor then hands off the task to a Worker (if using Celery or Kubernetes) or runs it locally. The Metadata Database stores the state of all DAGs and tasks. The Webserver provides the UI for monitoring and interacting with your pipelines.
You control a DAG’s behavior through its parameters. schedule="@daily" tells it to run once a day. start_date is the date from which the schedule becomes active. catchup=False prevents it from running for all past missed schedules. tags are for organizing DAGs in the UI. Within tasks, you can define retries (retries=3), retry delays (retry_delay=timedelta(minutes=5)), and timeouts (execution_timeout=timedelta(hours=1)). The xcom_pull and xcom_push mechanisms (used implicitly by PythonOperator when returning values) allow tasks to pass small amounts of data between each other, like the filenames or evaluation metrics shown in the example.
A key aspect of Airflow’s power is its extensibility. You can write custom operators to interact with any service or tool in your ML ecosystem, from specific cloud storage buckets (like Google Cloud Storage or AWS S3) to ML platforms like Databricks or SageMaker. This allows you to build highly customized pipelines that perfectly match your MLOps requirements.
Most people focus on the DAG structure and scheduling, but the real power for MLOps comes from the XCom (Cross-Communication) system. While it’s often used for passing simple strings or numbers, you can push and pull entire Python objects, JSON payloads, or even small dataframes between tasks. This enables sophisticated branching logic, conditional execution, and dynamic task generation based on the output of previous steps, forming the backbone of complex ML decision-making within a pipeline.
The next step after mastering DAG scheduling is understanding how to manage and monitor these pipelines at scale using Airflow’s various executors and integrations.