MLOps Blog

Deep Dive into ML Models in Production Using TensorFlow Extended (TFX) and Kubeflow

19 min
29th August, 2023

If I had a dollar for every machine learning model wasting in Jupyter Notebooks across the industry, Iā€™d be a millionaire.ā€Šā€”ā€ŠMe

It is one thing to build models, yet another to put them in production. One of the hardest parts of machine learning is effectively putting models in production. 

With an emphasis on effectively here, because while there are lots of ways to put models in production, there exist few tools that can effectively deploy, monitor, track, and automate this process.

In this tutorial, Iā€™m going to introduce you to TensorFlow Extended, popularly known as TFX. You’re going to take an example machine learning project and put it in production using TFX, Google AI Platform Pipelines, and Kubeflow. 

Neptune vs Kubeflow
The Best Kubeflow Alternatives

Donā€™t worry if youā€™re just learning about these tools for the first time, Iā€™m going to try my best to explain them properly, while also practically implementing them. 

To complete this tutorial or follow along, youā€™ll need a GCP account. If you donā€™t have one, head over here and register, and youā€™ll be given a $300 signup credit which you can use to experiment. In addition to that, youā€™ll need:

  • Basic understanding of Machine Learning
  • Basic understanding of Tensorflow 
  • A little familiarity with Cloud platforms
  • Obviously, you need Python as well

Introduction to TFX and Kubeflow

TFX is a production-scale machine learning platform based on Tensorflow. It is owned and actively maintained by Google, and itā€™s used internally at Google. 

tensorflow extended
Source: TensorFlow

TFX provides a bunch of frameworks, libraries, and components for defining, launching, and monitoring machine learning models in production. 

The components available in TFX let you build efficient ML pipelines specifically designed to scale from the start. These components include:

  • modeling, 
  • training 
  • serving (inference), 
  • and managing deployments to different targets like web, mobile or IoT devices.

In the diagram below, you see a relationship between available TFX libraries and available pipeline components you can use to achieve this. 

TFX libraries components
Source: TensorFlow

How to Serve Machine Learning Models with TensorFlow Serving and Docker
The Best Tools, Libraries, Frameworks and Methodologies that Machine Learning Teams Actually Use ā€“ Things We Learned from 41 ML Startups

Youā€™ll notice that TFX libraries and components cover a typical end-to-end machine learning pipeline, which starts with data ingestion and ends with model serving.

The TFX libraries for achieving the different tasks shown above are available in Python and can be installed separately. But itā€™s advisable to just install TFX, which comes with all the components. 

As you proceed along with this tutorial, youā€™ll use different TFX components, and Iā€™ll first explain what it does before showing you how to use it. 

TFX under the hood and orchestrators

By default, TFX creates a directed acyclic graph (DAG) of your ML pipeline. It uses Apache-Beam under the hood for managing and implementing pipelines, and this can be easily executed on distributed processing back-ends like Apache Spark, Google Cloud Dataflow, Apache Flink, and so on. 

Itā€™s important to mention that Beam comes with a direct runner, so it can be used in scenarios like testing or small deployments. 

While TFX running Apache Beam is cool and all, it is difficult to configure, monitor, and maintain defined pipelines and workflows. This gave rise to tools we call orchestrators. 

Orchestrators like Kubeflow or Apache Airflow make it easy to configure, operate, monitor, and maintain ML pipelines. They mostly come with GUIs that you can easily understand. 

Below is a sample GUI of Airflow showing defined tasks:

And hereā€™s one for Kubeflow:

kubeflow
Source: Kubeflow

In this tutorial, youā€™ll be using Kubeflow as your orchestrator for your ML pipelines, so letā€™s briefly talk about Kubeflow.

Kubeflow logo
Source: Kubeflow.org

Kubeflow is an open-source kubernetes-native platform specifically designed for developing, orchestrating, deploying and running scalable ML workloads. 

It helps you easily manage end-to-end ML pipeline orchestration, run workflows in numerous environments like on-premise or Cloud, and also provides support for visualization of workflows. 

Now that we have the basic introduction out of the way, letā€™s get into the practical aspect of this tutorial. In the next section, youā€™ll set up your project on GCP. 

Setting up a new Google Cloud project

If you have created a Google Cloud account, then you already have your free GC credits. Next, youā€™ll create a new project for this tutorial. 

Note:

If you have exhausted your free credit, then there will be charges to your GC account, so remember to clean the project at the end of this tutorial.

To set up a new project, follow the steps below:

  • Give your project a name. Iā€™ll call mine tfx-project.
tfx new project
  • Click create to Create the new project
  • Once created, click on the project drop-down again, and select the new project you just created. 

When done right, you should see your project dashboard.

CHECK ALSO

How to Deal With Files in Google Colab: Everything You Need to Know

Configuring AI Platform Pipeline and setting up Kubeflow

Now that you have created your project, youā€™re going to set up Kubeflow. Follow the steps below to achieve this:

  • Click on the hamburger menu to reveal available services on GCP, and scroll to where you have AI Platform and select Pipelines.
AI platform
  • In the pipelines page, click the NEW INSTANCE button. This opens the Kubeflow Pipelines page. Here you can configure your Kubeflow engine. 
  • You can select a Zone for your cluster, or leave this as default.
  • Make sure to check the allow access box, as this is required for your cluster to have access to other cloud APIs.
allow access box
  • Click the Create cluster button and wait for a couple of minutes until the cluster has been created.
  • You can select a namespace, or leave as default. Also, if you are going to be using a managed storage for your artifact, then you can add your storage details, or just leave as blank.
  • Finally, click Deploy, and wait for the pipeline to be deployed. 
deploy pipeline

Set up Cloud AI Platform Notebook

Next, youā€™re going to set up a Notebook in the AI Platform. This lets us perform experiments in the familiar Jupyter Notebook environment. To instantiate this new notebook, follow the steps below:

  • Select the AI Platform option from the GC hamburger menu, and click Notebooks option.
  • Next, select the Enable Notebooks API and then click on NEW INSTANCE to create a new Notebook instance on the AI Platform.
  • Depending on what you want to do and your budget, you can choose a custom startup library to add to your Notebook instance. For this article, Iā€™ll use the Tensorflow 2.1 version with no GPU installed. 
GPU installed
  • In the pop-up menu, click Customize at the bottom. This opens a configuration page, and if you scroll down, youā€™ll see an option to reduce the number of CPUs and RAM of your compute instance. You are going to use something smaller in order to reduce cost. 
reduce RAM

You can skip the step above if you have enough budget and need something very fast or big. 

  • Finally, scroll to the end and click on Create to create the Notebook.

Experimenting with Notebook in the Cloud

Now that you have set up your notebook, youā€™re going to open it. Follow the steps below to achieve this:

  • In the AI Platform dashboard, click on Pipelines again. This time youā€™ll see the just created Kubeflow pipeline, and next to it thereā€™s the OPEN PIPELINES DASHBOARD command. Click on it. 
open pipelines dashboard

This opens your pipeline in Kubeflow, and from here you can perform numerous functions particular to Kubeflow. 

  • Next click on Open TF 2.1 Notebook. This opens the Notebook page where you can select a Notebook instance. Click on the Notebook instance you created earlier as shown below.
AI platform notebook
  • Finally, click Create to start the notebook instance. This opens a familiar Jupyter notebook where you are going to do experimentation. 
create pipeline

In the opened Jupyter Notebook, youā€™re provided with a template file in the imported folder that helps you perform some important configuration and setup, as well as a template for working with TFX components. 

Weā€™ll leverage a lot of this configuration later, but for now let’s start building our project. 

In the notebook, create a new folder for your project (advert-pred). In that folder, create a new Jupyter notebook (advert-experimentation) and open it. 

advert experiment

In this Jupyter notebook, youā€™re going to walk through each TFX component individually and interactively. This will help you understand what each component is doing, and then at the end, youā€™ll turn your experimentation into a full pipeline and deploy it.

You can get the complete notebook here and the complete project code here.

Setup

In the first cell of your notebook, youā€™ll install TFX, Kubeflow (kfp) and a package called skaffold:

# Install tfx and kfp Python packages.
import sys
!{sys.executable} -m pip install --user --upgrade -q tfx==0.22.0
!{sys.executable} -m pip install --user --upgrade -q kfp==1.0.0
# Download skaffold and set it executable.
!curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold && mv skaffold /home/jupyter/.local/bin/Import packages

Skaffold is a command line tool that facilitates continuous development for Kubernetes applications. It helps us easily manage and handle workflows for building, pushing, and deploying applications. Youā€™ll get to understand the use of Skaffold later. 

After running the first cell, youā€™ll get some warnings – ignore them for now. One of them informs you that your installations are not in your env PATH. Youā€™ll fix this in the next cell by adding them to PATH.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

Next, you set some important environment variables which will be used later by Kubeflow for orchestrating the pipeline. Copy the code below to a new cell:

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)

The first variable is your GCP project ID. This can be accessed from your environment since youā€™re on the AI platform. Next is your Kubeflow Pipeline Cluster Endpoint. 

The endpoint URL is used to access KFP cluster when deploying your pipeline. To get your KFP endpoint, copy the URL from the Kubeflow ā€œGetting Startedā€ page.

getting started page

Assign the copied URL to the variable ENDPOINT:

ENDPOINT='https://2adfdb83b477n893-dot-us-central2.pipelines.googleusercontent.com'

When copying the ENDPOINT URL, make sure  to remove the last path of the URL and stop copying at the end of .com.

Next, youā€™ll create a Docker name which will be used by Skaffold in bundling your pipeline. 

CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/advert-pred-pipeline'

Lastly, you will set your base path and set your current working directory as your project folder.

#set base path and working directory
BASE_PATH = str(os.path.join(os.getcwd(), 'advert-pred'))
%cd {BASE_PATH}

And youā€™re done with your setup

Next, youā€™ll start your interactive exploration of each TFX component.

Running TFX components interactively

TFX comes with an built-in orchestrator that allows you to run each component interactively in a Jupyter Notebook. 

This helps you to easily explore each component, as well as visualize the output. At the end of your exploration, you can then export your code as a Pipeline. 

Now, letā€™s see this in action. In a new code cell, import the following packages:

import os
import pprint
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

Among the packages imported, youā€™ll notice that you imported different modules from the tfx components package. These components will be used sequentially to define the pipeline, and at different stages of the ML process.

The last line of code (%load_ext) loads a tfx notebook extension which can be used to mark code cells that should be skipped when automatically generating pipelines from the notebook.

Yes! You can automatically export your notebook as a pipeline to run on Apache Beam or Airflow.

Upload your data

In your advert-pred directory, create a new folder called data. Youā€™ll upload your data here for use in the experimentation phase. 

Before you proceed, download the advertisement data from here.

uploading data

To upload data to your notebook instance, open the data folder you created, click on the upload icon and select the data file you just downloaded. 

After uploading, letā€™s take a peek at the top rows.

data_root = 'data'
data_filepath = os.path.join(data_root, "advertising.csv")
!head {data_filepath}

//output

TFX output 1
Sample data output

The dataset is from this challenge on Kaggle. The task is to predict if a customer will click an AD or not. So it is a binary classification task. 

Note:

This is a tutorial on building ML pipelines, and as such, Iā€™ll not be covering extensive feature engineering or analysis.

Letā€™s get started with TFX components. 

Before you proceed, first youā€™ll create something called InteractiveContext. An InteractiveContext will allow you to run TFX components interactively in a notebook so that you can visualize its output. This is in contrast to a production environment where you will use an orchestrator to run your components. 

In a new cell, create and run the InteractiveContext as shown below:

context = InteractiveContext()

ExampleGen

The first component youā€™ll use is ExampleGen. ExampleGen is usually at the beginning of your ML pipeline because it is used to ingest data, split into train and evaluation sets, convert the data to an efficient tf.Example format, and also copy the data to a managed directory for easy access by other components down the pipeline. 

In the code cell below, youā€™ll pass the data source to ExampleGen input parameter and run it using the context:

example_gen = CsvExampleGen(input=external_input(_data_root))
context.run(example_gen)
example gen

Above, you can view an interactive widget of ExampleGenā€™s output. These outputs are called artifacts, and ExampleGen generally produces two artifacts –ā€Šā€Štraining examples and evaluation examples. 

By default, ExampleGen splits the data into 2/3 training and uses 1/3 for the evaluation set. 

You can view the artifact and the URI where they are stored as well:

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

//ouputs

TFX output 2

Now that ExampleGen has finished ingesting the data, the next step is data analysis.

StatisticsGen

The StatisticsGen component is used to compute statistics over your dataset. These statistics provide a quick overview of your data, with details like the shape, features present, and also value distribution. 

To compute statistics about the data, you will pass in as input the output from the ExampleGen.

statistics_gen = StatisticsGen(
   examples=example_gen.outputs['examples'])
context.run(statistics_gen)
execution result

These statistics can be visualized using the contextā€™s show method as shown below:

context.show(statistics_gen.outputs['statistics'])

Below you see the generated statistics for numerical features:

numerical features statistics
Statistics for numerical features

And if you scroll down, in the statistics widget, youā€™ll find the description for categorical variables as well. 

categorical variables
Statistics for categorical features

Using StatisticGen, you can quickly get an overview of your data. You can check for missingness, presence of zeros and also the distribution. 

These statistics will be used by downstream components like SchemaGen and ExampleValidator for detecting anomalies, skew and drift when new data is ingested in production. 

SchemaGen

The SchemaGen component will generate a schema for your data from the statistics. A Schema is simply a definition of your data. It defines types, properties expected, bounds and so on from the data features. 

In the code cell below, you will pass the StatisticsGen output to the SchemaGen input and then visualize the output as well. 

schema_gen = SchemaGen(
   statistics=statistics_gen.outputs['statistics'],
   infer_feature_shape=False)
context.run(schema_gen)
context.show(schema_gen.outputs['schema'])
schema gen

Each feature in the dataset is represented, as well as the expected Type, Presence, Valency and Domain.

ExampleValidator

The next component in the ML pipeline is the ExampleValidator. This component validates your data and detects anomalies based on the defined schema. 

This can be used to validate any new data that comes into your pipeline when in production. Itā€™s useful for detecting drift, changes, skew in new data before theyā€™re fed into your model. 

In the code cell below, we pass in the StatisticsGen and SchemaGen outputs to the ExampleValidator:

example_validator = ExampleValidator(
   statistics=statistics_gen.outputs['statistics'],
   schema=schema_gen.outputs['schema'])
context.run(example_validator)
context.show(example_validator.outputs['anomalies'])
example validator

Our data currently contains no anomaly. We can proceed to the next component. 

Transform 

The next component down the pipeline is the Transform component. This component performs feature engineering on the training and serving data. 

To perform transform on the ingested data, you will need to pass in data from the ExampleGen, pass in the Schema from SchemaGen and finally pass in a Python module containing your transformation code. 

In your project directory (advert-pred), create a new folder called model. In this folder, youā€™ll define your transformation code as well as model code. 

In the model folder, create three scriptsā€Šā€”ā€Šconstants.py, advert-transform.py and __init__.py.

To create a Python script in Jupyter Lab, open the corresponding folder, click on the + icon, create a text file and then change the extension to .py.

In constants.py, you will define some variables like the names of your Categorical features, Numeric features, as well as features you need to encode. In our case, youā€™ll use some selected features as shown below:

DENSE_FLOAT_FEATURE_KEYS = ['DailyTimeSpentOnSite', 'Age',                                     'AreaIncome', 'DailyInternetUsage' ]
VOCAB_FEATURE_KEYS = ['City', 'Male', 'Country' ]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10
# Keys
LABEL_KEY = 'ClickedOnAd'

def transformed_name(key):
   return key + '_xf'

The DENSE_FLOAT_FEATURE_KEYS representing all numeric features, while the VOCAB_FEATURE_KEYS features contain all string features you want to encode.

Also, you add a small helper function that will append _xf to each feature name. This is used in the transform module to differentiate transformed features from raw features. 

In the advert-transform.py, you will import your contacts and then define transformation steps. This is where all code to process, clean, fill missing values are located.

import tensorflow as tf
import tensorflow_transform as tft
from model import constants

_DENSE_FLOAT_FEATURE_KEYS = constants.DENSE_FLOAT_FEATURE_KEYS
_LABEL_KEY = constants.LABEL_KEY
_VOCAB_FEATURE_KEYS = constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = constants.VOCAB_SIZE
_OOV_SIZE = constants.OOV_SIZE
_transformed_name = constants.transformed_name

def preprocessing_fn(inputs):
 """tf.transform's callback function for preprocessing inputs.
 Args:
   inputs: map from feature keys to raw not-yet-transformed features.
 Returns:
   Map from string feature key to transformed feature operations.
 """
 outputs = {}
 for key in _DENSE_FLOAT_FEATURE_KEYS:
   # Preserve this feature as a dense float, setting nan's to the mean.
   outputs[_transformed_name(key)] = tft.scale_to_z_score(
       inputs[key])

 for key in _VOCAB_FEATURE_KEYS:
   # Build a vocabulary for this feature.
   outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
       inputs[key],
       top_k=_VOCAB_SIZE,
       num_oov_buckets=_OOV_SIZE)
outputs[_transformed_name(_LABEL_KEY)] = inputs[_LABEL_KEY]
return outputs

At the top of the advert-constants.py script, you initialized the constants defined in constants.py. Next, you define the preprocessing_fn function. This function is called by the Transform component, and the name (preprocessing_fn) should be preserved.

In the preprocessing_fn function, you will process each feature according to its type, rename it and then append it to the output dictionary. The Transform component expects the preprocessing_fn to return a dictionary of transformed features. 

Also, note that our preprocessing code is written in pure Tensorflow. This is recommended so that your operations can be optimally distributed and run in clusters. If you want to use pure Python code, though not recommended, you can cast them to Tensorflowā€™s function using the “@tf_funtion” wrapper.

Now back to your notebook, and add the following code in a new cell:

advert_transform = 'model/advert-transform.py'
transform = Transform(
   examples=example_gen.outputs['examples'],
   schema=schema_gen.outputs['schema'],
   module_file=advert_transform)
context.run(transform)

First, you define the path to the transform code, then you pass the examples and schema. When you run this, you should see a really long output showing some transformed features, and at the end, you can see the artifact widget as shown below: 

artifact widget
execution result

The transform component will generate two artifacts: a transform_graph and transformed_examples. The transform_graph defines all preprocessing steps as a Directed Acyclic Graph (DAG), that can be used on any new data ingested, while the transformed_examples contains the actual preprocessed training and evaluation data. 

You can easily view this by calling the transform output as shown below:

transform.outputs

//output

TXF output 3
Transform component output

Now that you have ingested, analyzed and transformed your data, you will define the next component called the Trainer

Trainer

The Trainer component is used to train a model defined in Tensorflow/Keras. The Trainer will accept the schema, the transformed data and transformation graph, transform parameters, as well as your model definition code. 

In your model folder, create a new Python script called advert-trainer.py, and add the following code. 

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
from model import constants

_DENSE_FLOAT_FEATURE_KEYS = constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = constants.VOCAB_SIZE
_OOV_SIZE = constants.OOV_SIZE
_LABEL_KEY = constants.LABEL_KEY
_transformed_name = constants.transformed_name

def _transformed_names(keys):
 return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
 """Small utility returning a record reader that can read gzip'ed files."""
 return tf.data.TFRecordDataset(
     filenames,
     compression_type='GZIP')

def _get_serve_tf_examples_fn(model, tf_transform_output):
 """Returns a function that parses a serialized tf.Example and applies TFT."""
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function

def serve_tf_examples_fn(serialized_tf_examples):
   """Returns the output to be used in the serving signature."""
   feature_spec = tf_transform_output.raw_feature_spec()
   feature_spec.pop(_LABEL_KEY)
   parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
   transformed_features = model.tft_layer(parsed_features)
   return model(transformed_features)
return serve_tf_examples_fn

def _input_fn(file_pattern, tf_transform_output,
             batch_size=100):
 """Generates features and label for tuning/training.
Args:
   file_pattern: List of paths or patterns of input tfrecord files.
   tf_transform_output: A TFTransformOutput.
   batch_size: representing the number of consecutive elements of returned
     dataset to combine in a single batch
Returns:
   A dataset that contains (features, indices) tuple where features is a
     dictionary of Tensors, and indices is a single Tensor of label indices.
 """
 transformed_feature_spec = (
     tf_transform_output.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
     file_pattern=file_pattern,
     batch_size=batch_size,
     features=transformed_feature_spec,
     reader=_gzip_reader_fn,
     label_key=_transformed_name(_LABEL_KEY))
return dataset

def _build_keras_model(hidden_units):
 """Creates a DNN Keras model for classifying taxi data.
 """
 real_valued_columns = [
     tf.feature_column.numeric_column(key, shape=())
     for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
 ]
 categorical_columns = [
     tf.feature_column.categorical_column_with_identity(
         key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
     for key in _transformed_names(_VOCAB_FEATURE_KEYS)
 ]
 indicator_column = [
     tf.feature_column.indicator_column(categorical_column)
     for categorical_column in categorical_columns
 ]
model = _wide_and_deep_classifier(
     wide_columns=indicator_column,
     deep_columns=real_valued_columns,
     dnn_hidden_units=hidden_units or [100, 70, 60, 50])
 return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
 """returns a simple keras wide and deep model.
 """
 input_layers = {
     colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
     for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
 }

 input_layers.update({
     colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
     for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
 })
deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
 for numnodes in dnn_hidden_units:
   deep = tf.keras.layers.Dense(numnodes)(deep)

 wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)
output = tf.keras.layers.Dense(
     1, activation='sigmoid')(
         tf.keras.layers.concatenate([deep, wide]))
model = tf.keras.Model(input_layers, output)
 model.compile(
     loss='binary_crossentropy',
     optimizer=tf.keras.optimizers.Adam(lr=0.01),
     metrics=[tf.keras.metrics.BinaryAccuracy()])
 model.summary(print_fn=absl.logging.info)
 return model

# TFX Trainer will call this function.

def run_fn(fn_args: TrainerFnArgs):
 """Train the model based on given args.
 Args:
   fn_args: Holds args used to train the model as name/value pairs.
 """
 # Number of nodes in the first layer of the DNN
 first_dnn_layer_size = 150
 num_dnn_layers = 4
 dnn_decay_factor = 0.7
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
 eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
model = _build_keras_model(
     # Construct layers sizes with exponetial decay
     hidden_units=[
         max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
         for i in range(num_dnn_layers)
     ])
log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
 tensorboard_callback = tf.keras.callbacks.TensorBoard(
     log_dir=log_dir, update_freq='batch')
 model.fit(
     train_dataset,
     steps_per_epoch=fn_args.train_steps,
     validation_data=eval_dataset,
     validation_steps=fn_args.eval_steps,
     callbacks=[tensorboard_callback])
signatures = {
     'serving_default':
         _get_serve_tf_examples_fn(model,
                                   tf_transform_output).get_concrete_function(
                                       tf.TensorSpec(
                                           shape=[None],
                                           dtype=tf.string,
                                           name='examples')),
 }
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

The code above is quite long, so weā€™ll walk through each section:

  • In the import section, we import some of the constants you will use from the constants module. 
  • Next, you define three utility functions. The first _transformed_names simply returns a modified name for each feature. _gzip_reader_fn is used to read a file in TFRecordDataset format. This is the format our data is represented in by the ExampleGen. Lastly, the _get_serve_tf_examples_fn which parses each tf.Example from the dataset and applies the transform function.
  • Next, in the _input_fn function, you simply generate a tf.Dataset file from the transformed features. This is the efficient data format used for training our model.
  • The next two functions _build_keras_model and _wide_and_deep_classifier constructs a Keras model using the functional API. This functional API is useful here because we are defining a static graph that can be orchestrated, and as such, each feature must be correctly defined before the model is compiled. 

Learn more

How to build a wide-and-deep model using Keras in TensorFlow 2.0

Using Feature columns with the Keras Functional APItowardsdatascience.com

  • Next, and most importantly, you will define the run_fn. This function is called by the Trainer component, and as such the name should not be changed. In this function, you initialize the train and evaluation dataset from the transformed output, initialize the model, define a logging directory for model outputs and Tensorboard, and finally fit the model. 
  • Next, you define the serving signature, which is used by the next component Pusher for serving your model.
  • Finally, you save the model to the serving directory using the defined signature. 

When youā€™re done defining the trainer script, go back to your notebook and add the Trainer component as shown below:

advert_trainer = 'model/advert-trainer.py'
trainer = Trainer(
   module_file=advert_trainer,
   custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
   examples=transform.outputs['transformed_examples'],
   transform_graph=transform.outputs['transform_graph'],
   schema=schema_gen.outputs['schema'],
   train_args=trainer_pb2.TrainArgs(num_steps=1000),
   eval_args=trainer_pb2.EvalArgs(num_steps=500))
context.run(trainer)
trainer component

The Trainer accepts the trainer module, transformed examples from the transform outputs, the transform graph, schema, as well as trainer arguments for training and evaluation steps. 

Now that youā€™re done with training, youā€™ll add the next component – the Evaluator. 

Evaluator

The Evaluator component computes model performance metrics in the evaluation set. It can also be used to validate any newly trained models. 

This is useful when you are improving and testing new models in production. To set up the Evaluator, you need to define a configuration. 

The configuration simply instructs the Evaluator on what metrics to report, what threshold to use in evaluating new models, and so on. See more about this here

Add the following configuration code to your notebook:

eval_config = tfma.EvalConfig(
   model_specs=[tfma.ModelSpec(label_key='ClickedOnAd')],
   metrics_specs=[
       tfma.MetricsSpec(
           metrics=[
               tfma.MetricConfig(class_name='ExampleCount'),
               tfma.MetricConfig(class_name='BinaryAccuracy',
                 threshold=tfma.MetricThreshold(
                     value_threshold=tfma.GenericValueThreshold(
                         lower_bound={'value': 0.5}),
                     change_threshold=tfma.GenericChangeThreshold(
                         direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                         absolute={'value': -1e-10})))
           ]
       )
   ])
model_resolver = ResolverNode(
     instance_name='latest_blessed_model_resolver',
     resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
     model=Channel(type=Model),
     model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)
execution result 3

Next, youā€™ll pass this config to the Evaluator, as well as the examples and trained model outputs as shown below:

evaluator = Evaluator(
   examples=example_gen.outputs['examples'],
   model=trainer.outputs['model'],
   baseline_model=model_resolver.outputs['model'],
   eval_config=eval_config)
context.run(evaluator)

To visualize the output of the Evaluator, use the show method as shown below:

context.show(evaluator.outputs['evaluation'])
example count

Above, you can see the metrics reported by the Evaluator. If you train a new model, the performance is going to be compared with the baseline model, which in our case does not exist because this is our first model. 

Also, the Evaluator can tell us that a model is BLESSED or NOT BLESSED. A BLESSED model has successfully passed all evaluation criteria and is better than the current model. This can be then pushed and served by the Pusher component, else it throws an error. This means that you can easily automate model deployment. 

To check if our current model has been BLESSED by the Evaluator, get the output from the Evaluator as shown below:

blessing_uri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}

//output

TFX output 4

Note:

Our model is automatically blessed since it is the first model in our pipeline. If you train another model, and rerun the Evaluator pipeline, then it will be compared with the current model and it becomes BLESSED or NOT BLESSED.

Now that your model has been trained and blessed, you can easily export it to the serving model directory using the Pusher component.

Pusher

The Pusher component which always comes last in the pipeline is used to export a BLESSED model to the serving directory. 

To add this component, you will need to pass in the Trainer output, the Evaluator output as well as the serving directory. This is shown below:

serving_model_dir = 'serving_model/advert-pred'
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
    filesystem=pusher_pb2.PushDestination.Filesystem(
    base_directory=serving_model_dir)))
    context.run(pusher)

First, you can specify the serving model directory where your trained model will be pushed to. This can be cloud storage or local File System. 

And thatā€™s it! You have successfully implemented all the TFX components, from ingesting data to generating statistics to generating schema, to transformation, model training, evaluation and finally model saving. You can easily export your notebook to be run in an orchestration like Apache Beam, Kubeflow, or Apache Airflow. 

In the next section, Iā€™m going to show you how to use Kubeflow to orchestrate your pipeline. 

Setting up your Kubeflow pipeline

To run or orchestrate your pipeline with Kubeflow, you are going to write some configuration code. This helps set up Kubeflow as well as define the TFX pipeline components to add. Follow the steps below to achieve this:

  • First, create a new Python script in your project directory called pipeline.py. 

In your pipeline.py file, you will add all the components as we did during the interactive exploration phase. 

This file defines the TFX pipeline and various components in the pipeline.

from ml_metadata.proto import metadata_store_pb2
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.extensions.google_cloud_ai_platform.pusher import executor as ai_platform_pusher_executor
from tfx.extensions.google_cloud_ai_platform.trainer import executor as ai_platform_trainer_executor
from tfx.extensions.google_cloud_big_query.example_gen import component as big_query_example_gen_component
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
import tensorflow_model_analysis as tfma

def create_pipeline(pipeline_name,
                   pipeline_root,
                   data_path,
                   preprocessing_fn,
                   run_fn,
                   train_args,
                   eval_args,
                   eval_accuracy_threshold,
                   serving_model_dir,
                   metadata_connection_config=None,
                   beam_pipeline_args=None,
                   ai_platform_training_args=None,
                   ai_platform_serving_args=None):
components = []
example_gen = CsvExampleGen(input=external_input(data_path))
 components.append(example_gen)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
 components.append(statistics_gen)
schema_gen = SchemaGen(
     statistics=statistics_gen.outputs['statistics'],
     infer_feature_shape=True)
 components.append(schema_gen)
example_validator = ExampleValidator(
     statistics=statistics_gen.outputs['statistics'],
     schema=schema_gen.outputs['schema'])
 components.append(example_validator)
transform = Transform(
     examples=example_gen.outputs['examples'],
     schema=schema_gen.outputs['schema'],
     preprocessing_fn=preprocessing_fn)
 components.append(transform)
trainer_args = {
     'run_fn': run_fn,
     'transformed_examples': transform.outputs['transformed_examples'],
     'schema': schema_gen.outputs['schema'],
     'transform_graph': transform.outputs['transform_graph'],
     'train_args': train_args,
     'eval_args': eval_args,
     'custom_executor_spec':
         executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
 }

 if ai_platform_training_args is not None:
   trainer_args.update({
       'custom_executor_spec':
           executor_spec.ExecutorClassSpec(
               ai_platform_trainer_executor.GenericExecutor
           ),
       'custom_config': {
           ai_platform_trainer_executor.TRAINING_ARGS_KEY:
               ai_platform_training_args,
       }
   })

 trainer = Trainer(**trainer_args)
 components.append(trainer)
model_resolver = ResolverNode(
     instance_name='latest_blessed_model_resolver',
     resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
     model=Channel(type=Model),
     model_blessing=Channel(type=ModelBlessing))
 components.append(model_resolver)
eval_config = tfma.EvalConfig(
       model_specs=[tfma.ModelSpec(label_key='ClickedOnAd')],
       metrics_specs=[
           tfma.MetricsSpec(
               metrics=[
                   tfma.MetricConfig(class_name='ExampleCount'),
                   tfma.MetricConfig(class_name='BinaryAccuracy',
                   threshold=tfma.MetricThreshold(
                       value_threshold=tfma.GenericValueThreshold(
                           lower_bound={'value': 0.5}),
                       change_threshold=tfma.GenericChangeThreshold(
                           direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                           absolute={'value': -1e-10})))
               ]
           )
       ])
 evaluator = Evaluator(
     examples=example_gen.outputs['examples'],
     model=trainer.outputs['model'],
     baseline_model=model_resolver.outputs['model'],
     eval_config=eval_config)
 components.append(evaluator)
# Checks whether the model passed the validation steps and pushes the model
 # to a file destination if check passed.
 pusher_args = {
     'model':
         trainer.outputs['model'],
     'model_blessing':
         evaluator.outputs['blessing'],
     'push_destination':
         pusher_pb2.PushDestination(
             filesystem=pusher_pb2.PushDestination.Filesystem(
                 base_directory=serving_model_dir)),
 }
 if ai_platform_serving_args is not None:
   pusher_args.update({
       'custom_executor_spec':
           executor_spec.ExecutorClassSpec(ai_platform_pusher_executor.Executor
                                          ),
       'custom_config': {
           ai_platform_pusher_executor.SERVING_ARGS_KEY:
               ai_platform_serving_args
       },
   })
 pusher = Pusher(**pusher_args)  # pylint: disable=unused-variable
 components.append(pusher)
return pipeline.Pipeline(
     pipeline_name=pipeline_name,
     pipeline_root=pipeline_root,
     components=components,
     metadata_connection_config=metadata_connection_config,
     beam_pipeline_args=beam_pipeline_args,
 )

If you notice, the code above is similar to what you wrote during the interactive exploration stage, here you just simply remove the InteractiveContext, and append each component to a component list. 

To create the pipeline, you import the pipeline from tfx.orchestrator and pass in the required parameters which are simply the pipeline name, root directory where all output will be stored, the components list, and metadata configuration.

Next, create a Kubeflow runner script (kubeflow_dag_runner.py) and paste the code below:

Define KubeflowDagRunner to run the pipeline using Kubeflow.

import os
from absl import logging
import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import trainer_pb2
from tfx.utils import telemetry_utils

# Retrieve your GCP project. You can choose which project
# to use by setting GOOGLE_CLOUD_PROJECT environment variable.
try:
 import google.auth
 try:
   _, GOOGLE_CLOUD_PROJECT = google.auth.default()
 except google.auth.exceptions.DefaultCredentialsError:
   GOOGLE_CLOUD_PROJECT = ''
except ImportError:
 GOOGLE_CLOUD_PROJECT = ''
PIPELINE_NAME = 'advert_pred_pipeline'
# Specify your GCS bucket name here. You have to use GCS to store output files
GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-kubeflowpipelines-default'
PREPROCESSING_FN = 'model.advert-transform.preprocessing_fn'
RUN_FN = 'model.advert-trainer.run_fn'
TRAIN_NUM_STEPS = 1000
EVAL_NUM_STEPS = 500
EVAL_ACCURACY_THRESHOLD = 0.5
# TFX pipeline produces many output files and metadata. All output data will be
# stored under this OUTPUT_DIR.
OUTPUT_DIR = os.path.join('gs://', GCS_BUCKET_NAME)
# TFX produces two types of outputs, files and metadata.
# - Files will be created under PIPELINE_ROOT directory.
PIPELINE_ROOT = os.path.join(OUTPUT_DIR, 'advert_pred_pipeline_output',
                            PIPELINE_NAME)
# The last component of the pipeline, "Pusher" will produce serving model under
# SERVING_MODEL_DIR.
SERVING_MODEL_DIR = os.path.join(PIPELINE_ROOT, 'serving_model')
DATA_PATH = 'gs://{}/advert-pred/data/'.format(GCS_BUCKET_NAME)

def run():
 """Define a kubeflow pipeline."""
metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
 tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
     kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)

 pod_labels = kubeflow_dag_runner.get_default_pod_labels()
 pod_labels.update({telemetry_utils.LABEL_KFP_SDK_ENV: 'advert-pred'})
 kubeflow_dag_runner.KubeflowDagRunner(
     config=runner_config, pod_labels_to_attach=pod_labels
 ).run(
     pipeline.create_pipeline(
         pipeline_name=PIPELINE_NAME,
         pipeline_root=PIPELINE_ROOT,
         data_path=DATA_PATH,
         preprocessing_fn=PREPROCESSING_FN,
         run_fn=RUN_FN,
         train_args=trainer_pb2.TrainArgs(num_steps=TRAIN_NUM_STEPS),
         eval_args=trainer_pb2.EvalArgs(num_steps=EVAL_NUM_STEPS),
         eval_accuracy_threshold=EVAL_ACCURACY_THRESHOLD,
         serving_model_dir=SERVING_MODEL_DIR,
     ))
if __name__ == '__main__':
 logging.set_verbosity(logging.INFO)
 run()

This script is specific to Kubeflow and is generic, so you can use it across your projects. Here, you define all variables such as data path, the storage location for outputs, pipeline name, as well as training and evaluation parameters. 

This script also contains an important function called run. The run function defines the configuration for successfully executing a Kubeflow pipeline. It instantiates a pipeline object from the specified parameters and then executes it. 

Now, head over to your notebook instance again. In a new cell just after your interactive exploration, copy the advertising data to your Google Cloud Storage. 

Note:

If your data is already in GCS, ignore this and just specify the path in the pipeline.py file.

By default, a cloud bucket has been created for your pipeline when you instantiated Kubeflow. Youā€™ll specify this path when copying the dataset.

## copy data to cloud storage for easy access from Kubeflow
!gsutil cp data/advertising.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/advert-pred/data/data.csv

You can navigate to the Cloud Storage browser to confirm that the file has been uploaded. 

Next, set the PIPELINE variable name. This is the same as the one in the Kubeflow_dag_runner.py script:

PIPELINE_NAME = ‘advert_pred_pipeline’

Next, in a new cell, youā€™ll create the pipeline using tfx pipeline create command as shown below:

Notice that you specified the Kubeflow runner script, the Endpoint of your Kubeflow instance, as well as the Docker image name. 

!tfx pipeline create
--pipeline-path=kubeflow_dag_runner.py
--endpoint={ENDPOINT}
--build-target-image={CUSTOM_TFX_IMAGE}

Running the command above will take a couple of minutes to complete. This is because TFX uses the skaffold package we installed earlier to build a docker image for the pipeline. 

After a successful build, youā€™ll find two new files (Dockerfile and build.yaml). Next, you will submit this pipeline job to Kubeflow by using the tfx run command. This is shown below:

!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
kubeflow pipeline final

Your Kubeflow pipeline has been launched successfully! To monitor this, head over to your Kubeflow Instance page (where you copied the ENDPOINT URL from), click on Experiments, and then select your pipeline name.

You should see a Graph of your pipeline as it executes. If there is any error, it displays a red failure icon and the pipeline stops, else it displays green and all components are executed. 

adclicked pipeline

You can also check the visualization of each output in Kubeflow. This is automatically generated from the outputs of each component. 

For instance, below we check the output of statisticsgen. Click on the statisticsgen node and select visualization. 

statistic

To see the generated output as well as the saved model, you can navigate to your GCS bucket. 

GCS bucket

If you make an update to your pipeline, you can easily update and run it using the code below:

!tfx pipeline update
--pipeline-path=kubeflow_dag_runner.py 
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

And thatā€™s it! You have successfully orchestrated an end-to-end ML pipeline using TFX and Kubeflow. You can build your entire ML workflow easily and effectively with the combination of the tools used in this tutorial. 

Summary

In this tutorial, you learned how to build ML components using TFX, how to create a Notebook instance on the AI Cloud Platform, how to run TFX components interactively, and finally how to orchestrate your pipeline with Kubeflow. 

This is really important knowledge that you can start using for your next company or individual project. 

Hope it helps you. I canā€™t wait to see what you build!

Link to complete course code for this tutorial can be found here.

References

This tutorial is designed to introduce TensorFlow Extended (TFX) and Cloud AI Platform Pipelines, and help you learn to | www.tensorflow.org

TFX is a Google-production-scale machine learning (ML) platform based on TensorFlow. It provides a configuration | www.tensorflow.org

From the command line. For this particular pipeline, look for the services with prefix workflow1 (its prefix), and note | cloud.google.com

Apache Beam provides a framework for running batch and streaming data processing jobs that run on a variety of | www.tensorflow.org

Was the article useful?

Thank you for your feedback!
What topics would you like to see for your next read
Let us know what should be improved

    Thanks! Your suggestions have been forwarded to our editors