End-to-end Pipeline with KFServing

Tutorial for End-to-end Pipeline with KFServing

NOTE: All tutorials are available in Jupyter Notebook format. To download the tutorials run curl -L https://downloads.mesosphere.io/kudo-kubeflow/d2iq-tutorials-1.0.1-0.3.1.tar.gz | tar xz from a Jupyter Notebook Terminal running in your KUDO Kubeflow installation.

NOTE: Please note that these notebook tutorials have been built for and tested on D2iQ's KUDO for Kubeflow. Without the requisite Kubernetes operators and custom Docker images, these notebook will likely not work.

Kubeflow Pipelines: from Training to Serving


With Kubeflow Pipelines you can build entire workflows that automate the steps involved in going from training a machine learning model to actually serving an optimized version of it. These steps can be triggered automatically by a CI/CD workflow or on demand from a command line or notebook.

Kubeflow Pipelines (kfp) comes with a user interface for managing and tracking experiments, jobs, and runs. A pipeline is a description of a machine learning workflow, replete with all inputs and outputs. In Kubeflow Pipelines, an experiment is a workspace where you can experiment with different configuration of your pipelines. Experiments are a way to organize runs of jobs into logical groups. A run is simply a single execution (instance) of a pipeline. Kubeflow Pipelines also supports recurring runs, which is a repeatable run of a pipeline. Based on a so-called run trigger an instance of a pipeline with its run configuration is periodically started. As of now, run triggers are time-based (i.e. not event-based).

In the UI, there is a pictorial representation of the runtime execution of a pipeline. This graph consists of one or more steps (i.e. nodes). Each step The directed edges (arrows) show the parent/child relationship: A → B means that B depends on A; B cannot start until A has successfully completed.

A component performs a single step in the pipeline (e.g. data ingestion, data preprocessing, data transformation, model training, hyperparameter tuning). It is analogous to a function: it has a name, (metadata) parameters and return values (interface), and a body (implementation). It must therefore be self-contained. Each component must be packaged as a Docker image. Please note that components are independently executed: they do not share the same process and cannot share in-memory data.

What You’ll Learn

This notebook trains a simple (MNIST) model in TensorFlow and serves it with KFServing, which is a serverless inference server. What this means is that you do not have to worry about which machines it runs on, networking, autoscaling, health checks, and what have you. Instead, you can focus on what matters to you: the model and a REST API you can call for predictions. If you are familiar with Kubernetes, you can even do out-of-the-box canary deployments, in which a percentage of traffic is directed to the ‘canary (in the coal mine)’ with the latest model to ensure it functions properly before completely rolling out any (potentially problematic) updates.

If you prefer to use a more sophisticated model or a TensorFlow-based one, you can check out the relevant notebooks: MNIST with TensorFlow or MNIST with PyTorch.

KFServing reads the model file from MinIO, an open-source S3-compliant object storage tool, which is already included with your Kubeflow installation.

What You’ll Need

This notebook.


Let’s make sure Kubeflow Pipelines is available:

! pip show kfp
Name: kfp
Version: 0.5.0
Summary: KubeFlow Pipelines SDK
Home-page: UNKNOWN
Author: google
Author-email: None
License: UNKNOWN
Location: /opt/conda/lib/python3.7/site-packages
Requires: tabulate, google-auth, Deprecated, kfp-server-api, google-cloud-storage, click, requests-toolbelt, argo-models, strip-hints, jsonschema, PyYAML, kubernetes, cloudpickle

How to Configure Credentials

In order for KFServing to access MinIO, the credentials must be added to the default service account.

KFServing is imported as a pipeline component (ContainerOp) in this notebook. Consequently, it does not allow configuration of custom service accounts.
%%writefile minio_secret.yaml
apiVersion: v1
kind: Secret
  name: minio-s3-secret
     serving.kubeflow.org/s3-endpoint: minio-service.kubeflow:9000
     serving.kubeflow.org/s3-usehttps: "0" # Default: 1. Must be 0 when testing with MinIO!
type: Opaque
  awsAccessKeyID: bWluaW8=
  awsSecretAccessKey: bWluaW8xMjM=
apiVersion: v1
kind: ServiceAccount
  name: default
  - name: minio-s3-secret
Writing minio_secret.yaml
! kubectl apply -f minio_secret.yaml
secret/minio-s3-secret created

How to Implement Kubeflow Pipelines Components

As we said before, components are self-contained pieces of code: Python functions.

The function must be completely self-contained. No code (incl. imports) can be defined outside of the body itself. All imports must be included in the function body itself! Imported packages must be available in the base image.

Why? Because each component will be packaged as a Docker image. The base image must therefore contain all dependencies. Any dependencies you install manually in the notebook are invisible to the Python function once it's inside the image. The function itself becomes the entrypoint of the image, which is why all auxiliary functions must be defined inside the function. That does cause some unfortunate duplication, but it also means you do not have to worry about the mechanism of packaging, as we shall see below.

For our pipeline, we shall define four components:

  • Download the MNIST data set
  • Train the TensorFlow model
  • Evaluate the trained model
  • Export the trained model

We also need the current Kubernetes namespace, which we can dynamically grab using Kubeflow Fairing.

from typing import NamedTuple

import kfp
import kfp.components as components
import kfp.dsl as dsl
import kubeflow.fairing.utils

from kfp.components import InputPath, OutputPath

NAMESPACE = kubeflow.fairing.utils.get_current_k8s_namespace()

Function arguments specified with InputPath and OutputPath are the key to defining dependencies. For now, it suffices to think of them as the input and output of each step. How we can define dependencies is explained in the next section.

Component 1: Download the MNIST Data Set

def download_dataset(data_dir: OutputPath(str)):
    """Download the MNIST data set to the KFP volume to share it among all steps"""

    import tensorflow_datasets as tfds

    tfds.load(name="mnist", data_dir=data_dir)

Component 2: Train the Model

For both the training and evaluation we must divide the integer-valued pixel values by 255 to scale all values into the [0, 1] (floating-point) range. This function must be copied into both component functions (cf. normalize_image).

If you wish to learn more about the model code, please have a look at the MNIST with TensorFlow notebook.

def train_model(data_dir: InputPath(str), model_dir: OutputPath(str)):
    """Trains a single-layer CNN for 5 epochs using a pre-downloaded dataset.
    Once trained, the model is persisted to `model_dir`."""

    import os
    import tensorflow as tf
    import tensorflow_datasets as tfds

    def normalize_image(image, label):
        """Normalizes images: `uint8` -> `float32`"""
        return tf.cast(image, tf.float32) / 255.0, label

    model = tf.keras.models.Sequential(
            tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10, activation="softmax"),

    ds_train, ds_info = tfds.load(

    # See: https://www.tensorflow.org/datasets/keras_example#build_training_pipeline
    ds_train = ds_train.map(
        normalize_image, num_parallel_calls=tf.data.experimental.AUTOTUNE
    ds_train = ds_train.cache()
    ds_train = ds_train.shuffle(ds_info.splits["train"].num_examples)
    ds_train = ds_train.batch(128)
    ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

        ds_train, epochs=5,

    print(f"Model saved {model_dir}")

Component 3: Evaluate the Model

With the following Python function the model is evaluated. The metrics metadata (loss and accuracy) is available to the Kubeflow Pipelines UI. Metadata can automatically be visualized with output viewer(s). Please go here to see how to do that.

def evaluate_model(
    data_dir: InputPath(str), model_dir: InputPath(str), metrics_path: OutputPath(str)
) -> NamedTuple("EvaluationOutput", [("mlpipeline_metrics", "Metrics")]):
    """Loads a saved model from file and uses a pre-downloaded dataset for evaluation.
    Model metrics are persisted to `/mlpipeline-metrics.json` for Kubeflow Pipelines

    import json
    import tensorflow as tf
    import tensorflow_datasets as tfds
    from collections import namedtuple

    def normalize_image(image, label):
        return tf.cast(image, tf.float32) / 255.0, label

    ds_test, ds_info = tfds.load(

    # See: https://www.tensorflow.org/datasets/keras_example#build_training_pipeline
    ds_test = ds_test.map(
        normalize_image, num_parallel_calls=tf.data.experimental.AUTOTUNE
    ds_test = ds_test.batch(128)
    ds_test = ds_test.cache()
    ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

    model = tf.keras.models.load_model(model_dir)
    (loss, accuracy) = model.evaluate(ds_test)

    metrics = {
        "metrics": [
            {"name": "loss", "numberValue": str(loss), "format": "PERCENTAGE"},
            {"name": "accuracy", "numberValue": str(accuracy), "format": "PERCENTAGE"},

    with open(metrics_path, "w") as f:
        json.dump(metrics, f)

    out_tuple = namedtuple("EvaluationOutput", ["mlpipeline_metrics"])

    return out_tuple(json.dumps(metrics))

Component 4: Export the Model

def export_model(
    model_dir: InputPath(str),
    metrics: InputPath(str),
    export_bucket: str,
    model_name: str,
    model_version: int,
    import os
    import boto3
    from botocore.client import Config

    s3 = boto3.client(

    # Create export bucket if it does not yet exist
    response = s3.list_buckets()
    export_bucket_exists = False

    for bucket in response["Buckets"]:
        if bucket["Name"] == export_bucket:
            export_bucket_exists = True

    if not export_bucket_exists:
        s3.create_bucket(ACL="public-read-write", Bucket=export_bucket)

    # Save model files to S3
    for root, dirs, files in os.walk(model_dir):
        for filename in files:
            local_path = os.path.join(root, filename)
            s3_path = os.path.relpath(local_path, model_dir)

                ExtraArgs={"ACL": "public-read"},

    response = s3.list_objects(Bucket=export_bucket)
    print(f"All objects in {export_bucket}:")
    for file in response["Contents"]:
        print("{}/{}".format(export_bucket, file["Key"]))

How to Combine the Components into a Pipeline

Note that up to this point we have not yet used the Kubeflow Pipelines SDK!

With our four components (i.e. self-contained funtions) defined, we can wire up the dependencies with Kubeflow Pipelines.

The call components.func_to_container_op(f, base_image=img)(*args) has the following ingredients:

  • f is the Python function that defines a component
  • img is the base (Docker) image used to package the function
  • *args lists the arguments to f

What the *args mean is best explained by going forward through the graph:

  • downloadOp is the very first step and has no dependencies; it therefore has no InputPath. Its output (i.e. OutputPath) is stored in data_dir.
  • trainOp needs the data downloaded from downloadOp and its signature lists data_dir (input) and model_dir (output). So, it depends on downloadOp.output (i.e. the previous step’s output) and stores its own outputs in model_dir, which can be used by another step. downloadOp is the parent of trainOp, as required.
  • evaluateOp's function takes three arguments: data_dir (i.e. downloadOp.output), model_dir (i.e. trainOp.output), and metrics_path, which is where the function stores its evaluation metrics. That way, evaluateOp can only run after the successful completion of both downloadOp and trainOp.
  • exportOp runs the function export_model, which accepts five parameters: model_dir, metrics, export_bucket, model_name, and model_version. From where do we get the model_dir? It is nothing but trainOp.output. Similarly, metrics is evaluateOp.output. The remaining three arguments are regular Python arguments that are static for the pipeline: they do not depend on any step’s output being available. Hence, they are defined without using InputPath. Since it is the last step of the pipeline, we also do not list any OutputPath for use in another step.
def train_and_serve(
    data_dir: str,
    model_dir: str,
    export_bucket: str,
    model_name: str,
    model_version: int,
    # For GPU support, please add the "-gpu" suffix to the base image
    BASE_IMAGE = "mesosphere/kubeflow:1.0.1-0.3.1-tensorflow-2.2.0"

    downloadOp = components.func_to_container_op(
        download_dataset, base_image=BASE_IMAGE

    trainOp = components.func_to_container_op(train_model, base_image=BASE_IMAGE)(

    evaluateOp = components.func_to_container_op(evaluate_model, base_image=BASE_IMAGE)(
        downloadOp.output, trainOp.output

    exportOp = components.func_to_container_op(export_model, base_image=BASE_IMAGE)(
        trainOp.output, evaluateOp.output, export_bucket, model_name, model_version

    # Create an inference server from an external component
    kfserving_op = components.load_component_from_url(
    kfserving = kfserving_op(


Just in case it isn’t obvious: this will build the Docker images for you. Each image is based on BASE_IMAGE and includes the Python functions as executable files. Each component can use a different base image though. This may come in handy if you want to have reusable components for automatic data and/or model analysis (e.g. to investigate bias).

Note that you did not have to use Kubeflow Fairing or docker build locally at all!

Remember when we said all dependencies have to be included in the base image? Well, that was not quite accurate. It's a good idea to have everything included and tested before you define and use your pipeline components to make sure that there are not dependency conflicts. There is, however, a way to add packages (packages_to_install) and additional code to execute before the function code (extra_code).

Is that it? Not quite!

We still have to define the pipeline itself. Our train_and_serve function defines dependencies but we must use the KFP domain-specific language (DSL) to register the pipeline with its four components:

# See: https://github.com/kubeflow/kfserving/blob/master/docs/DEVELOPER_GUIDE.md#troubleshooting
def op_transformer(op):
    op.add_pod_annotation(name="sidecar.istio.io/inject", value="false")
    return op

    name="End-to-End MNIST Pipeline",
    description="A sample pipeline to demonstrate multi-step model training, evaluation, export, and serving",
def mnist_pipeline(
    model_dir: str = "/train/model",
    data_dir: str = "/train/data",
    export_bucket: str = "mnist",
    model_name: str = "mnist",
    model_version: int = 1,

With that in place, let’s submit the pipeline directly from our notebook:

pipeline_func = mnist_pipeline
run_name = pipeline_func.__name__ + " run"
experiment_name = "End-to-End MNIST Pipeline"

arguments = {
    "model_dir": "/train/model",
    "data_dir": "/train/data",
    "export_bucket": "mnist",
    "model_name": "mnist",
    "model_version": "1",

client = kfp.Client()
run_result = client.create_run_from_pipeline_func(
[I 200512 09:16:17 _client:267] Creating experiment End-to-End MNIST Pipeline.

Experiment link here

Run link here

The graph will look like this:


If there are any issues with our pipeline definition, this is where they would flare up. So, until you submit it, you won’t know if your pipeline definition is correct.

We have so far claimed that Kubeflow Pipelines is for automation of multi-step (ad hoc) workflows and usage in CI/CD. You may have wondered why that is. After all, it is possible to set up recurring runs of pipelines. The reason is that these pipeline steps are one-offs. Even though you can parameterize each step, including the ones that kick off an entire pipeline, there is no orchestration of workflows. Stated differently, if a step fails, there is no mechanism for automatic retries. Nor is there any support for marking success: if the step is scheduled to be run again, it will be run again, whether or not the previous execution was successful, obviating any subsequent runs (except in cases where it may be warranted). Kubeflow Pipelines allows retries but it is not configurable out of the box. If you want Airflow- or Luigi-like behaviour for dependency management of workflows, Kubeflow Pipelines is not the tool.

How to Predict with the Inference Server

The simplest way to check that our inference server is up and running is to check it with curl ( pre-installed on the cluster).

To do so, let’s define a few helper functions for plotting and displaying images:

import matplotlib.pyplot as plt

def display_image(x_test, image_index):
    plt.imshow(x_test[image_index].reshape(28, 28), cmap="binary")

def predict_number(model, x_test, image_index):
    pred = model.predict(x_test[image_index : image_index + 1])
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_test = x_test / 255.0  # We must transform the data in the same way as before!

image_index = 1005

display_image(x_test, image_index)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step


The inference server expects a JSON payload:

import codecs, json

tf_serving_req = {"instances": x_test[image_index : image_index + 1].tolist()}

with open("input.json", "w") as json_file:
    json.dump(tf_serving_req, json_file)
model = "mnist"
url = f"http://{model}-predictor-default.{NAMESPACE}.svc.cluster.local/v1/models/{model}:predict"

! curl -L $url -d@input.json
    "predictions": [[8.5606473e-07, 3.70297289e-06, 6.13634074e-06, 0.00199914398, 0.000423449732, 1.81742507e-05, 2.92009634e-08, 0.000312483666, 0.000718666473, 0.996517301]

The probabilities for each class (0-9) are shown in the predictions response. The model believes the image shows a “9”, which indeed it does!

For more details on the URL, please check out this example.