Vertex AI Pipelines is a managed service for orchestrating machine learning workflows on Google Cloud.
Let’s look at a typical pipeline run. Imagine we have a pipeline.py file that defines our ML workflow:
from kfp import dsl
from google.cloud import aiplatform
@dsl.component(
packages_to_install=["google-cloud-aiplatform", "pandas"],
base_image="python:3.9",
)
def preprocess_data(
input_csv_path: str,
output_dataset_id: str,
) -> str:
import pandas as pd
from google.cloud import aiplatform
aiplatform.init(project="your-gcp-project-id", location="us-central1")
# Load data
df = pd.read_csv(input_csv_path)
# Simple preprocessing: fill NaNs with mean
for col in df.select_dtypes(include=['float64', 'int64']).columns:
df[col] = df[col].fillna(df[col].mean())
# Save to Vertex AI Dataset
dataset = aiplatform.TabularDataset.create(
display_name=f"preprocessed-{output_dataset_id}",
bq_uri=f"bq://your-gcp-project-id/your_bq_dataset/{output_dataset_id}",
)
# This is a simplified representation; actual saving might involve staging to GCS first.
print(f"Preprocessed data saved to dataset: {dataset.resource_name}")
return dataset.resource_name
@dsl.component(
packages_to_install=["google-cloud-aiplatform", "scikit-learn"],
base_image="python:3.9",
)
def train_model(
training_dataset_id: str,
model_output_dir: str,
) -> str:
from google.cloud import aiplatform
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
aiplatform.init(project="your-gcp-project-id", location="us-central1")
# Load data from Vertex AI Dataset (simplified: assume it points to a GCS/BQ resource)
# In a real scenario, you'd fetch the data URI from the dataset object.
# For this example, let's assume we know the BQ URI.
bq_uri = f"bq://your-gcp-project-id/your_bq_dataset/{training_dataset_id.split('/')[-1]}" # Extracting from dataset resource name
df = pd.read_gbq(bq_uri)
# Assume 'target' is the column to predict
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Model accuracy: {accuracy}")
# Save model to GCS
model_path = f"{model_output_dir}/random_forest_model.joblib"
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
return model_path
@dsl.pipeline(
name="mlops-vertex-pipeline-example",
description="A simple ML pipeline for demonstration.",
pipeline_root="gs://your-gcs-bucket/pipeline-runs",
)
def ml_pipeline(
input_csv_path: str = "gs://cloud-samples-data/ai-platform-unified/datasets/tabular/iris.csv",
output_dataset_id: str = "iris_preprocessed_data",
model_output_dir: str = "gs://your-gcs-bucket/trained-models",
):
preprocess_task = preprocess_data(
input_csv_path=input_csv_path,
output_dataset_id=output_dataset_id,
)
train_task = train_model(
training_dataset_id=preprocess_task.output,
model_output_dir=model_output_dir,
)
# To compile and run:
# 1. Compile the pipeline:
# python -m kfp.v2.compiler.compiler --module your_pipeline_file.py --output_dir compiled_pipelines
# 2. Submit to Vertex AI Pipelines:
# gcloud ai pipelines run --region=us-central1 --pipeline-package-path=compiled_pipelines/mlops_vertex_pipeline_example.json --parameter-file=pipeline_params.yaml
This pipeline defines two components: preprocess_data and train_model. The preprocess_data component reads a CSV, fills missing values, and saves the result as a Vertex AI Tabular Dataset. The train_model component then loads this dataset, trains a scikit-learn RandomForestClassifier, and saves the model artifact to Google Cloud Storage (GCS). The @dsl.pipeline decorator defines the overall workflow, specifying a pipeline_root in GCS where pipeline artifacts will be stored.
The real magic happens when you compile and run this. The kfp.v2.compiler turns your Python code into a JSON representation that Vertex AI Pipelines understands. Then, gcloud ai pipelines run submits this JSON to the Vertex AI control plane. Vertex AI then provisions the necessary infrastructure (like Compute Engine VMs or GKE pods) for each component, runs your containerized code, and manages the dependencies and artifact passing between them. You can monitor the entire execution, including logs and outputs, directly in the Google Cloud Console.
The fundamental problem Vertex AI Pipelines solves is the reproducibility and scalability of ML experiments. Instead of manually stitching together scripts, managing environments, and tracking versions, you define your workflow declaratively. This makes it easy to:
- Version Control: Treat your ML pipelines like code.
- Automate: Trigger pipelines on schedules or events.
- Scale: Leverage Google Cloud’s infrastructure for compute.
- Monitor: Track runs, debug failures, and audit results.
The key levers you control are:
- Component Definition: How you write your individual ML tasks (data loading, preprocessing, training, evaluation) as reusable, containerized functions. This uses the Kubeflow Pipelines SDK (
kfp). - Pipeline Orchestration: How you connect these components into a directed acyclic graph (DAG) using the
@dsl.pipelinedecorator. - Resource Configuration: Specifying machine types, GPUs, and other compute resources for each component.
- Artifact Management: Defining where inputs (like datasets) and outputs (like trained models) are stored, typically in GCS or BigQuery, and how they are passed between components. The
pipeline_rootis crucial here. - Execution Environment: Packaging your component code into Docker containers, often using pre-built base images or specifying custom ones.
The most surprising thing about Vertex AI Pipelines is how seamlessly it integrates with other Google Cloud services. It’s not just a standalone orchestration tool; it’s deeply embedded. For example, when a component needs to access data, it uses the service account associated with the pipeline run to authenticate to BigQuery or GCS. Similarly, when you train a model, you can use Vertex AI’s managed training services. The aiplatform.init() call in the component code is a hint at this integration, allowing the SDK to know which project and location to operate within. This means your pipeline steps can directly interact with managed datasets, models, and endpoints without complex manual authentication setup for each service.
When you’re defining your pipeline, the pipeline_root parameter is critical. It’s not just a place to dump logs; it’s the designated GCS bucket where all intermediate and final artifacts produced by your pipeline are stored by default. If a component produces an output, and you don’t explicitly tell it where to save it, Vertex AI Pipelines will often save it into a sub-directory within this pipeline_root, typically using a naming convention based on the component name and run ID. This ensures that even if a component fails, its outputs up to that point are preserved for debugging or manual recovery.
The next concept you’ll likely dive into is conditional execution and loops within pipelines.