MLOps Blog

Training Models on Streaming Data [Practical Guide]

6 min
26th July, 2023

What comes into your mind when you hear Streaming Data? May be data generated through video streaming platforms like YouTube, but this is not the only thing which qualifies as streaming data. There are many platforms and sources that generate this kind of data.

In this article:

  • We will go through the basics of streaming data, what it is, and how it differs from traditional data.
  • We will also get familiar with tools that can help record this data and further analyze it.
  • And we will discuss its importance and how we can use machine learning for streaming data analysis with the help of a hands-on example.

What is streaming data?

ā€œStreaming data is a continuous flow of information and a foundation of event-driven architecture software modelā€ – RedHat

Enterprises around the world are becoming dependent on data more than ever. Some industries rely not only on traditional data but also need data from sources such as security logs, IoT sensors, and web applications to provide the best customer experience. For example, before any video streaming services, users had to wait for videos or audio to get downloaded. These days when you are listening to a song or a video, if you have auto-play on, the platform creates a playlist for you based on your real-time streaming data.

Batch processing vs streaming processing

ā€œWith the complexity of today’s modern requirements, legacy data processing methods have become obsolete for most use cases, as they can only process data as groups of transactions collected over time. Modern organisations need to act on up-to-the-millisecond data, before the data becomes stale. This continuous data offers numerous advantages that are transforming the way businesses run.ā€ – Upsolver

While developing an application or a system, it is important to understand how long your applications or users can wait for data to be available, and this is where you will have to choose between batch and streaming data processing. In this article, our focus is on streaming data, but before we deal with it, it is important to understand how it differs from Batch data processing. This will also help us observe the importance of stream data. 

 
Batch Processing
Streaming Processing

Scope

Used to run arbitrary queries over different datasets

Most suited for event-driven systems

Latency

Minutes to Hours

Most suited for event-driven systems

Processing

Processes large volume data set all at once

Processes data in real-time

Data in-flow & Size

Input flow is static and usually of a finite size

Input flow is dynamic and unknown in size

Analysis

Used for complex analytics

Used for simple and rolling metrics

Response time

Response is received only after the batch job is completed

Response is received as soon as data arrives

Source

Database, files

IoT, Event Hubs

You may also like

A Comprehensive Guide to Data Preprocessing

Streaming data processing architecture

ā€œTraditionally, applications that needed real-time responses to events relied on databases and message processing systems. Such systems cannot keep up with the torrent of data produced today.ā€ – Redhat

Illustration of basic I/O flow in Streaming Data Processing
Basic I/O flow in streaming data processing | Source

The streaming processing engine does not just get the data from one place to another, but it transforms the data as it passes through. This pipeline facilitates the smooth, automated flow of information, preventing many problems that enterprises face, such as data corruption, conflict, and duplication of data entries. A streaming data pipeline is an enhanced version which is able to handle millions of events in real-time at scale. Thus, a large amount of information can be collected, analysed, and stored. With that capability, applications, analytics, and reporting can be done in real-time.

The machine learning model is part of the Stream processing engine, and it provides the logic that helps the streaming data pipeline expose features within the stream and potentially within a historical data store.

There are a number of tools that can help with streaming data collection and processing, some popular ones include:

  • Apache Kafka: An open-source, distributed event streaming platform that can handle millions of events per second. It can be used to collect, store, and process streaming data in real-time.
  • Apache Flink: An open-source, distributed stream processing framework that can handle both batch and streaming data. It can be used to perform complex data processing tasks such as windowed aggregations, joins, and event-time processing.
  • Apache Spark: An open-source, distributed computing system that can handle big data processing tasks. It can be used to process both batch and streaming data and has built-in support for machine learning and graph processing.
  • Apache NiFi: An open-source tool that can be used to automate the collection, processing, and distribution of data. It provides a web-based interface for building data pipelines and can be used to process both batch and streaming data.
  • Azure Stream Analytics: A cloud-based service that can be used to process streaming data in real-time. It provides a variety of features, such as data ingestion, data transformation, and real-time processing.

These are just a few examples of the many tools available for streaming data collection and processing. The choice of tool will depend on the specific requirements of the application, such as the volume and velocity of the data, the complexity of the data processing, and the scalability and fault-tolerance needs.

Machine learning for streaming data: hands-on guide

Now that we have a fair understanding of what streaming data is, where it is being used, and how it is different from Batch data processing, Letā€™s get our hands dirty and learn how we can set up streaming processing with a few lines of code.

In this exercise, we will use Tensorflow, Keras, Scikit-learn, and Pandas to pre-process data and create machine learning models. For setting up streaming/continuous flow of data, we will be using Kafka and Zookeeper.

First, letā€™s install the necessary libraries:

!pip install tensorflow==2.7.1
!pip install tensorflow_io==0.23.1
!pip install kafka-python

Import all the functions and respective libraries:

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
# SKLearn libraries
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

We are using Kafka for stream processing because Kafka streams provide true a-record-at-a-time processing capabilities. It is a message broker from where the messages(data) can be consumed easily. Tools like Spark also use Kafka to read the messages and later break them into mini-batches to process them further. It depends on the use case and which tool we want. For this exercise, we are using Kafka, as it is one of the most popular tools. Also, python Kafka libraries are easy to use and understand.

Letā€™s Install and setup Kafka locally so we can easily simulate the streaming data environment:

!curl -sSOL https://downloads.apache.org/kafka/3.3.2/kafka_2.13-3.3.2.tgz
!tar -xzf kafka_2.13-3.3.2.tgz

!./kafka_2.13-3.3.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.3.2/config/zookeeper.properties
!./kafka_2.13-3.3.2/bin/kafka-server-start.sh -daemon
./kafka_2.13-3.3.2/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10
Waiting for 10 secs until; kafka and zookeeper services are up and running

Create Kafka topics – for training and testing data:

!./kafka_2.13-3.3.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic cancer-train
!./kafka_2.13-3.3.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic cancer-test
Created topic cancer-train. 
Created topic cancer-test.

For the purpose of this exercise, we will be using a breast cancer data set and feed it to the Kafka topics in the next few steps. This dataset is a batch dataset, but by storing it in Kafka, we are simulating an environment that offers continuous data retrieval for training and inference. 

cancer_df = pd.read_csv('breast-cancer-wisconsin.data.csv')
cancer_df.head()
Dataset

Replace column ā€˜Classā€™ values with 0 and 1

cancer_df['Class'] = cancer_df['Class'].replace(2,0)
cancer_df['Class'] = cancer_df['Class'].replace(4,1)

Create train and test subsets: 

train_df, test_df = train_test_split(cancer_df,                                     test_size=0.4,                                     shuffle=True)

print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["Class"], axis=1)
y_train_df = train_df["Class"]

x_test_df = test_df.drop(["Class"], axis=1)
y_test_df = test_df["Class"]
Number of training samples: 419 
Number of testing samples: 280

The label i.e., class labels, are set as the key for Kafka messages stored in multiple-partitions. This enables efficient data retrieval using consumer groups.

x_train = list(filter(None,                       x_train_df.to_csv(index=False).split("\n")[1:]))
                    y_train = list(filter(None,                       y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None,                      x_test_df.to_csv(index=False).split("\n")[1:]))                     y_test = list(filter(None,                      y_test_df.to_csv(index=False).split("\n")[1:]))

Time to push the data to the Kafka topics we created earlier. 

def error_callback(exc):
Ā  Ā  Ā  raise Exception('Error while sending data to kafka: {0}'.format(str(exc)))


def write_to_kafka(topic_name, items):
Ā  Ā  Ā  count=0
Ā  Ā  Ā  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
Ā  Ā  Ā  for message, key in items:
Ā  Ā  Ā  Ā  print(message.encode('utf-8'))
Ā  Ā  Ā  Ā  producer.send(topic_name,
Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  key=key.encode('utf-8'),
Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  Ā  value=message.encode('utf-8')).add_errback(error_callback)
Ā  Ā  Ā  Ā  count+=1
Ā  Ā  Ā  producer.flush()
Ā  Ā  Ā  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("cancer-train", zip(x_train, y_train))
write_to_kafka("cancer-test", zip(x_test, y_test))
Wrote 419 messages into topic: cancer-train
Wrote 280 messages into topic: cancer-test

To read the data from a Kafka topic, we will need to decode the data and create a dataset that can be utilised for model training.

def decode_kafka_item(item):
      message = tf.io.decode_csv(item.message,
                                [[0.0] for i in range(NUM_COLUMNS)])
      key = tf.strings.to_number(item.key)
      return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64


train_ds = tfio.IODataset.from_kafka('cancer-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

Letā€™s prepare model creation, set optimizer, loss and the metrics:

OPTIMIZER = "adam"
LOSS = tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS = ['accuracy']
EPOCHS = 10

Design and Build the model:

model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(9,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model sequential

Now, we will compile the model:

model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

It’s time to use the Kafka topics to train the model. Online or incremental learning is different from the traditional way of training the models, where you provide a batch of data values and let the model train on the same. Whereas, for streaming data, the model should continue to incrementally update the hyperparameters as and when the new data points arrive in the pipeline. In online learning/training, the data points may not be available once they are used for training(or messages are read).

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["cancer-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely,set it -1
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

We will incrementally train our model, which can also be saved in a periodic fashion, and later, we can utilise it to infer on the test data.

def decode_kafka_online_item(raw_message, raw_key):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(raw_key)
    return (message, key)
 
for mini_ds in online_train_ds:
    mini_ds = mini_ds.shuffle(buffer_size=32)
    mini_ds = mini_ds.map(decode_kafka_online_item)
    mini_ds = mini_ds.batch(32)
    if len(mini_ds) > 0:
      model.fit(mini_ds, epochs=3)

This is how we can keep the data inflow and keep training our model. To know more about tensorflow streaming api, please check out this page.

Importance and implications of streaming data

There is more to collecting and processing the data depending on the respective use cases. Data processing in batches is simply no longer feasible for enterprise businesses today. The use of real-time data streams is ubiquitous, from fraud detection and stock market platforms to ride share apps and e-commerce websites. Although it raises some concerns regarding privacy and security, the benefits are much more.

Importance

As streaming data becomes more prevalent, applications can process, filter, analyse, and react to it in real-time, as it is received. As a result, a variety of new opportunities become available, such as real-time Fraud Detection, Netflix Recommendations, and Seamless Shopping across multiple devices. Industries that deal with big data are getting benefited by continuous, real-time data.

  • Real-time decision making: Streaming data allows organisations to process and analyse data in real-time, which can be used to make quick and informed decisions. This is particularly useful for industries such as finance, healthcare, and transportation, where time is of the essence.
  • Improved customer experience: Streaming data can be used to monitor and analyse customer interactions in real-time, which can be used to improve customer service and provide personalised recommendations.
  • Predictive analytics: Streaming data can be used to train machine learning models in real-time, which can be used for predictive analytics and forecasting.
  • Operational efficiency: Streaming data can be used to monitor and analyse the performance of industrial equipment, which can be used to improve operational efficiency and reduce downtime.
  • Fraud detection: Streaming data can be used to detect and prevent fraudulent activities in real-time, which can help organisations to minimise financial losses.
  • Internet of Things: Streaming data is important for IoT device communication and data collection, it allows devices to send and receive data in real-time and helps in more accurate and efficient decision making.

Implications

Streaming data can have a variety of implications depending on the context in which it is being used. 

  • Real-time processing: Streaming data allows for real-time processing and analysis, which can be useful for a variety of applications, such as monitoring and control systems, financial transactions, and online customer interactions.
  • Scalability: Streaming data systems are designed to handle large amounts of data, making them well-suited for big data applications such as social media analytics and IoT data processing.
  • Latency: Streaming data systems often have low latency, which means that the time between data being generated and being processed is short. This can be important for applications that require quick response times, such as financial trading or autonomous vehicles.
  • Complexity: Streaming data systems can be complex to design, implement, and maintain, particularly when working with large volumes of data, multiple sources, and real-time requirements.
  • Security: Streaming data can also imply security risks, since it increases the attack surface and the amount of data that is exposed, so it is important to have a strong security infrastructure in place.
  • Privacy: Streaming data systems may also raise privacy concerns, as they often collect and process large amounts of personal information. It is important to ensure that data is collected and used in compliance with relevant laws and regulations, and that appropriate measures are taken to protect user privacy.

The ability to process and analyse data in real-time can provide organisations with a significant competitive advantage, improve customer satisfaction and make more informed decisions.

Check also

Predicting Stock Prices Using Machine Learning

Conclusion

Streaming data processing and its architecture can eliminate the requirements of running scalable data engineering functions. It is also flexible and can be adapted for any use case. As streaming data is becoming more and more popular with time, we need to build an ML-based system which can use this real time data and contribute to more sophisticated data analysis. In this article, we learned about streaming data and how it can be processed. We also saw how it is different from batch data processing. 

We also got familiar with some of the tools that can help us with streaming data collection, and later in the hands-on exercise, we utilised one of them – Kafka. In the hands-on exercise, we saw how the kafka topics could be set up, and data can be fed into them. Once the data is available on the Kafka topic, we can decode and utilise it to train our machine learning models incrementally.

For future work, rather than using the csv file, we can utilise Twitter API and create a machine learning model for Sentiment analysis.

Happy Learning!

References

  1. Machine Learning & Streaming Data Pipeline Architecture
  2. Streaming First Real Time ML
  3. Machine Learning for Streaming Data: State of Art
  4. Machine Learning for Streaming Data with Creme
  5. ML Prediction on Streaming Data
  6. Continuous Machine Learning over Streaming Data
  7. Building AI models for High Frequency Streaming Data
  8. Machine Learning for Data Streams:Real Examples

Was the article useful?

Thank you for your feedback!
Thanks for your vote! It's been noted. | What topics you would like to see for your next read?
Thanks for your vote! It's been noted. | Let us know what should be improved.

    Thanks! Your suggestions have been forwarded to our editors