Building high performance data pipelines to train machine learning models has been saved
Building high performance data pipelines to train machine learning models
Deloitte on Cloud Blog
Considering building a data ingestion and preprocessing pipeline to train a machine learning model?
Did you know that there are specific design considerations that we need to think about when we are building a data pipeline to train a Machine Learning model? Supervised machine learning (ML) models need to be trained with labeled datasets before the models can be used for inference.
If we are building a Fraud detection model, we must start with a dataset where the transactions are labelled with an attribute that tells us whether the transactions are fraudulent or not. Once the model trains on the dataset and learns the patterns, the model can then look at a new transaction and infer whether the transaction is fraudulent or not.
In most cases, the dataset needs to be read from a file and preprocessed before being fed into the training process. If we are dealing with small datasets, the ingestion (such as opening the file, and reading the content) and preprocessing do not require any special consideration. But if the dataset is large, we need to think carefully about how best to build an optimal data pipeline.
In this article, we will discuss concepts and considerations for building such data ingestion and preprocessing pipelines with Tensorflow, a popular open source ML software library.
Wait, I already have a data pipeline in my Cloud Data Lake!
The architecture of a modern data lake on the cloud has been standardized over the past few years. Data in the lake is usually stored in a columnar, splittable, compressed file format, such as Parquet,. Parquet is a very popular file format, especially if Apache spark is used as the distributed data processing framework.
In a typical data lake, we create a “raw” data layer for initial as-is entry to ingest batch data feeds or streaming data. We process the data residing in the “raw” zone by cleansing, standardizing, possibly deduplicating and ultimately placing it in a “curated” zone. We typically use a distributed data processing framework such as Apache Spark, made available through AWS EMR clusters, or GCP Cloud DataProcs to perform the data processing task. We generally store the data in the “curated” layer in specially optimized compressed and splittable file formats (like Parquet) for better performance for subsequent data processing or querying the data in the curated layer.
Once curated data is stored in a data lake, one can process the curated data to serve multiple use cases such as populating a datastore like BigQuery or Redshift or even training a ML model. The specific downstream use case that we want to discuss is data ingestion and preprocessing requirements that are needed to train a ML model in Tensorflow.
Tensorflow Records and tf.data
If one of those downstream processing use cases is training of a deep learning model with very large datasets, the above data lake setup may be inefficient. If we run into performance problems in training our ML models using Tensorflow, we should consider building our pipeline using Tensorflow Records and tf.data, the data module of Tensorflow API library.
Tensorflow Records or TFRecords in short, is the recommended binary file format for high throughput data ingestion, which is optimized for data ingestion for training ML models using Tensorflow.
Writing and Reading TFRecords
How do we create Tensorflow records? Tensorflow has a module named “io” that comes with TFRecord writer method.
tf.io.TFRecordsWriter (path, options=none)
Data needs to be serialized before writing to a TFRecords file format.
Data Serialization is the process of taking a data structure and converting it into byte streams to store in the disk, store in memory or transmit over the wire for processing or remote storage.
The serialization is done in tf.example message format which is a protocol buffer or protobuff in short. From Google’s documentation:
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data–think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
We take each data element, create a tf.example message and write it into a TFRecords file. When we need to ingest TFRecords files, we read the file(s) into memory and deserialize the content of the file back to a regular data structure.
Deserialization is the reverse process of reconstructing the data structure from the serialized byte strings.
“tf.data” module in Tenforflow API library is designed for creating optimal data input pipelines. The very first step in using “tf.data” is to create a dataset object. If we have two TFRecord files, tf1.tfrecords and tf2.tfrecords in my Cloud storage bucket, we can use the following to create a dataset out of it:
mydataset = tf.data.TFRecordDataset("tf1.tfrecords", "tf2.tfrecords")
Once we have our dataset, we can apply various data transformations to preprocess the dataset and eventually feed the data into the training pipeline. The dataset is processed as a stream so that it does not need to be fit into memory before processing can start.
Optimizing the pipeline
Tensorflow Records is not a requirement for using Tensorflow. We should consider it when we observe or anticipate performance bottlenecks in our data ingestion pipeline. So what are the possible bottlenecks in a scenario where we are loading a large dataset from files, preprocessing the data row by row and feeding the rows into the model for training?
- When the training dataset is large, we typically use CPUs for data loading and preprocessing and accelerated hardware (GPU, TPU) for training. If we perform the open->read->preprocess->train sequence in a synchronous manner, then the GPUs can sit idle while data is being loaded or the CPU can sit idle when model is training.
- If the data is in remote storage, “time-to-first-byte” (TTFB) may be several orders of magnitude longer while reading a file.
- If we are loading files in a sequential manner, we may not be optimizing for the throughput available to us.
How can we optimize our data pipeline?
Our starting point is a set of large TFRecord files sitting on Cloud storage. Ideally, we would like to load multiple such files independent of one another and have the ability to parallelize each individual load to maximize our read throughput. If we go back to the “dataset” object, it does provide a method named “interleave” to do just that.
map_func, cycle_length=AUTOTUNE, block_length=1, num_parallel_calls=None,
The cycle length parameter determines the number of files that can be processed concurrently and number of parallel calls parameter sets the number of threads used to load each individual file.
The next idea is to be able to batch the dataset and fetch (open, read) the dataset in batches for step number n+1 when the model is being trained on step number n.
The prefetch step improves latency (idle time for GPU and the training step is reduced) and throughput. Total time to ingest data and train a model is reduced. The price we pay is the cost of additional memory needed to buffer the prefetched data.
Now it is time to optimize the preprocessing step. Data preprocessing is necessary in most cases before we can use the dataset for training.
If we are using a set of images to train a Convolution Neural Network, we may need to do heavy processing (scale, normalize, reduce noise, reduce dimensionality among many other possibilities) of an image.
We use the map method of the dataset object to perform these preprocessing tasks.
map_func, num_parallel_calls=None, deterministic=None
These preprocessing tasks are performed on an element by element basis and consequently are independent of one another. We can use the number of parallel calls parameter of the map function to perform the preprocessing in parallel taking advantage of the CPU cores available.
Automatic Tuning of the parameters
How do we decide on the right value of these tuning parameters to optimize our pipeline? How many parallel calls, how long a cycle length? The brute force method is to manually try various settings and converge on optimal numbers by trial and error. Tensorflow offers a better solution.
“tf.data” module runtime will try to tune values of these parameters if we set the values of any or all of these parameters to “tf.data.experimental.AUTOTUNE”.
Can Spark Deal with Tensorflow Records?
A big question is if we choose Tensorflow as our ML software library then how can we build an optimal data pipeline using TFRecords and “tf.data” module? If we go back to our data lake and our favorite data processing tool, Apache Spark, a natural question to ask is the following-“How do we generate TFRecords using Apache Spark?”
There are two open source libraries, Spark-Tensorflow-Connector and Spark-TFRecords that allow us to save Apache Spark dataframes and datasets into TFRecords and read from TFRecords into Spark dataframes. Spark-TFRecords is based on Spark-Tensorflow-Connector and has the additional capability of partitioning datasets by specific attributes to improve performance by partition elimination. Curated datasets intended to be consumed by a Tensorflow based ML platform need to be converted to TFRecords.
Looking ahead: Distributed Training
In case of a large training dataset, optimizing the data pipeline is not enough to reduce the time to train a model. We must consider how we can distribute the training process across multiple processors and nodes. We will discuss distributed training with Tensorflow in our next article. Stay tuned!
Interested in exploring more on cloud?