Skip to content

Run your first Kubeflow pipeline

Recently I've been learning MLOps. There's a lot to learn, as shown by this and this repository listing MLOps references and tools, respectively.

One of most exciting tools is Kubeflow. The project is described as follows:

The Kubeflow project is dedicated to making deployments of machine learning (ML) workflows on Kubernetes simple, portable and scalable. Our goal is not to recreate other services, but to provide a straightforward way to deploy best-of-breed open-source systems for ML to diverse infrastructures. Anywhere you are running Kubernetes, you should be able to run Kubeflow.

Kubeflow has multiple components: central dashboard, Kubeflow Notebooks to manage Jupyter notebooks, Kubeflow Pipelines for building and deploying portable, scalable machine learning (ML) workflows based on Docker containers, KF Serving for model serving (apparently superseded by KServe), Katib for hyperparameter tuning and model search, and training operators such as TFJob for training TF models on Kubernetes.

That's all great, but how to get started? Considering the number of components, installing Kubeflow seems like a formidable task. Indeed, the official documentation doesn't even say how to install Kubeflow on a local Kubernetes cluster running on, say, Minikube. Therefore, the easiest way to try it out seems to be use managed services like Google Cloud.

However, installing and trying out Kubeflow Pipelines (KFP) is a lot simpler. In this post, we'll create a local cluster with kind, install KFP as described here and run our first pipeline.

The code for this example can be found in this repository.

Setting up local cluster

First, you need to ensure you have kubectl installed. Follow the documentation. You also need to have Docker.

Next, install kind. On macOS, the executable can be installed with:

# Download
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.11.1/kind-darwin-amd64
# Add permission to execute
chmod +x ./kind
# Move to a folder in your PATH
mv ./kind ~/bin/kind

Create a Kubernetes cluster:

kind create cluster --name kind

Once installed, set the kubectl context to point to your local cluster.

kubectl config use-context kind-kind

Ensure that kubectl is correctly setup:

$ kubectl get pods
No resources found in default namespace.

Setup Kubernetes dashboard

This step is optional but useful if you want to browse the Kubernetes resources through UI.

Follow the instructions in Istio documentation to setup Kubernetes dashboard:

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.1.0/aio/deploy/recommended.yaml
$ kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default
$ token=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='default')].data.token}"|base64 --decode)
$ kubectl proxy

Login to dashboard with the token: http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/.

Setup Kubeflow pipelines

Follow the instructions here for deploying Kubeflow on kind cluster:

export PIPELINE_VERSION=1.7.1
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

Once the resources have been created, start Kubeflow Pipelines dashboard:

$ kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

and navigate to http://localhost:8080.

In the Kubernetes dashboard, you can find Kubeflow resources under the Kubeflow namespace.

Define pipeline

See the tutorial.

Install the Kubeflow Pipelines Python SDK by defining requirements.txt:

# requirements.txt
kfp

and install:

pip install -r requirements.txt

Define pipeline.py and add the first component:

# pipeline.py
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

web_downloader_op = kfp.components.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component-sdk-v2.yaml')

Kubeflow pipeline components are defined with YAML files. This web_downloader_op component definition can be found here. The component downloads data from the specified URL to the given location.

Next, we add a Python function-based component that handles the archive downloaded by the first component:

# pipeline.py

@component(
    packages_to_install=["pandas==1.1.4"],
    output_component_file="component.yaml"
)
def merge_csv(tar_data: Input[Artifact], output_csv: Output[Dataset]):
    import glob
    import pandas as pd
    import tarfile

    tarfile.open(name=tar_data.path, mode="r|gz").extractall("data")
    df = pd.concat(
        [pd.read_csv(csv_file, header=None) for csv_file in glob.glob("data/*.csv")]
    )
    df.to_csv(output_csv.path, index=False, header=False)

The function opens an archive and merges all CSV files into a single Pandas dataframe.

Note how the input tar_data has been defined as an artifact:

Artifacts represent large or complex data structures like datasets or models, and are passed into components as a reference to a file path.

If you have large amounts of string data to pass to your component, such as a JSON file, annotate that input or output as a type of Artifact, such as Dataset, to let Kubeflow Pipelines know to pass this to your component as a file.

Another option is to define component inputs as parameters:

Parameters typically represent settings that affect the behavior of your pipeline. Parameters are passed into your component by value, and can be of any of the following types: int, double, float, or str. Since parameters are passed by value, the quantity of data passed in a parameter must be appropriate to pass as a command-line argument.

Components return their outputs as files, annotated here with Output[Dataset].

Note that Python function-based components require stand-alone Python functions. Therefore, we need to import glob, pandas and tarfile inside the function. We also need to explicitly specify which packages to install.

Finally, we define our pipeline using the two components:

# pipeline.py

@dsl.pipeline(name="my-pipeline")
def my_pipeline(url: str):
    web_downloader_task = web_downloader_op(url=url)
    merge_csv_task = merge_csv(tar_data=web_downloader_task.outputs["data"])

Compile

To compile the Python function-based component into component.yaml and the pipeline into pipeline.yaml, add the following and run the script:

# pipeline.py

def compile():
    kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
        pipeline_func=my_pipeline, package_path="pipeline.yaml"
    )

def run():
  pass

def main():
  compile()
  run()

if __name __ == '__main__':
  main()

The script outputs component.yaml and pipeline.yaml containing the component definition and the pipeline definition, respectively.

Run the pipeline

Modify the run function defined as follows to run the pipeline locally:

# pipeline.py

def run():
    client = kfp.Client(host="http://127.0.0.1:8080/pipeline")

    client.create_run_from_pipeline_func(
        my_pipeline,
        mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
        arguments={
            "url": "https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz"
        },
    )

Run the script and navigate to the KFP dashboard to see your run being executed.

References