This article is part of a series describing Production ML – Production ML Series of Articles.
Data Transformation
In this article, we will talk about exciting things at the core of Production ML.
Data is often compared with the blood of Machine Learning. Data Transformation is an integral and vital part of any Production ML. Every time we train model or ask model to make predictions, we use data. And not just any data. We use information in specific formats, scale, normalize, and clean it. Model accuracy directly correlates to the quality of data we are ingesting into it.
The transformation required to prepare data usually depends on the type of data and model we’re using. Today, we will talk about DNN (Dense Neural Network) trained on a structured dataset. For such kind of data, transformation pipeline can include:
- Data ingestion and preliminary preparation (removing comas/spaces, concatenating/splitting, etc.).
- Encoding string categories to their number representation (simple encoding, one-hot encoding, bucketizing, etc.).
- Scaling and normalizing (scaling to zero mean and unit variance, reducing skewness, etc.).
- Feature Optimization (combining, eliminating features).
- Creating, updating, saving, and retrieving metadata about the transformation process.
We spoke about Feature Optimization in one of the previous articles (Optimizing Features in Production Machine Learning. Structured Dataset.). I’m also planning on dedicating a separate article to the topic of metadata in Production ML. Therefore, this article will review a practical implementation of data ingestion, encoding, scaling, and normalizing and preparing for the training.
When designing a data transformation pipeline, we should consider two essential factors:
- scalability: the system should be highly scalable and capable of parallelizing the workload between many workers;
- reproducibility: the same transformation we applied to the data in training should be applied to the data during the inference.
In our case, we are using TFX and Apache Beam. With Apache Beam as an orchestrator, we can use distributed data processing systems such as Spark, Flink, etc. We will use the same Census dataset we used in Data Ingestion and Validation in Production Machine Learning. Structured Dataset., and Optimizing Features in Production Machine Learning. Structured Dataset.
Preparation
First of all, we will install TFX and Apache Beam:
!pip install -U --quiet tfx
!pip install --quiet apache-beam
You might want to restart your runtime after running the commands above if you are following me. All necessary changes might not apply without runtime restart, especially in Google colab.
Next, we will import the necessary tools:
import tensorflow as tf
print('TF: {}'.format(tf.__version__))
import apache_beam as beam
print('Beam: {}'.format(beam.__version__))
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
print('Transform: {}'.format(tft.__version__))
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples
import os
Most of them you saw in the previous articles, so I describe them briefly:
- Tensorflow: Google Machine Learning framework, one of the most popular choices when it comes to production Machine Learning;
- TFX: Tensorflow Extended is a library developed on top of the Tensorflow, which provides extra capabilities for designing and building Production ML;
- Tensorflow Transform: library providing many useful tools for data transformations and connecting TFX with Apache Beam;
- TFXIO: part of TFXÂ Basic Shared Libraries, it is providing data ingestion helper methods;
- os: python library helping us with filesystem and files.
Loading data
After importing all necessary libraries and tools, we can download the dataset:
train = './adult.data'
test = './adult.test'
if not os.path.exists(train):
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
In the dataset, we have categorical and numerical features. We will need their names to arrange the processing:
CATEGORICAL_FEATURE_KEYS = [
'workclass',
'education',
'marital-status',
'occupation',
'relationship',
'race',
'sex',
'native-country',
]
NUMERIC_FEATURE_KEYS = [
'age',
'capital-gain',
'capital-loss',
'hours-per-week',
]
OPTIONAL_NUMERIC_FEATURE_KEYS = [
'education-num',
]
ORDERED_CSV_COLUMNS = [
'age', 'workclass', 'fnlwgt', 'education', 'education-num',
'marital-status', 'occupation', 'relationship', 'race', 'sex',
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label'
]
LABEL_KEY = 'label'
We will specify the expected Schema for the data like so:
RAW_DATA_FEATURE_SPEC = dict(
[ (name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS ] +
[ (name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS ] +
[ (name, tf.io.VarLenFeature(tf.float32)) for name in OPTIONAL_NUMERIC_FEATURE_KEYS ] +
[ (LABEL_KEY, tf.io.FixedLenFeature([], tf.string)) ]
)
SCHEMA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
tft.tf_metadata.schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC) ).schema
Schema is needed when we read data from files.
Below we specify some variables we will use during transformations:
#number out of vocabulary buckets - we will have one bucket where we place all oov tokens
NUM_OOV_BUCKETS = 1
#names of folders which will contain results of transformation
TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed'
TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed'
#label keays
LABEL_KEYS_TABLE = ['>50K', '<=50K']
Preprocessing function
Now we will ramp up the heart of our transformation pipeline – preprocessing function. This part of our algorithm is responsible for:
- encoding categorical features,
- scaling numerical features,
- filling absent values in sparse features,
- encoding labels.
Let’s take a look at our preprocessing function:
def preprocessing_fn(inputs):
#copy inputs because we will be changing some features
outputs = inputs.copy()
#encode categorical features with numbers based on total amount of possible values for each feature
for key in CATEGORICAL_FEATURE_KEYS:
outputs[key] = tft.compute_and_apply_vocabulary( tf.strings.strip(inputs[key]),
num_oov_buckets=NUM_OOV_BUCKETS,
vocab_filename=key)
#scale numerical features between 0 and 1
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(inputs[key])
#for numerical features that can be empty set default values 0 and scale them between 0 and 1
for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
sparse = tf.sparse.SparseTensor( inputs[key].indices,
inputs[key].values,
[inputs[key].dense_shape[0],1] )
dense = tf.sparse.to_dense( sp_input=sparse, default_value=0. )
dense = tf.squeeze(dense, axis=1)
outputs[key] = tft.scale_to_0_1(dense)
table_keys = ['>50K', '<=50K']
#block below will generate hash table for our labels
#tf.init_scope() will ensure we're not creating different hash tables for every runner but only one and
#this one table will be saved in transformation graph and graph will use this one table everywhere
with tf.init_scope():
initializer = tf.lookup.KeyValueTensorInitializer( keys = LABEL_KEYS_TABLE,
values = tf.cast(tf.range(len(LABEL_KEYS_TABLE)), tf.int64),
key_dtype = tf.string,
value_dtype = tf.int64)
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
label_str = tf.strings.regex_replace(inputs[LABEL_KEY], r'\.', '')
label_str = tf.strings.strip(label_str)
#encoded labels
data_labels = table.lookup(label_str)
#one-hot encoding labels
encoded_label = tf.one_hot( indices=data_labels,
depth=len(LABEL_KEYS_TABLE),
on_value=1.,
off_value=0.)
outputs[LABEL_KEY] = tf.reshape(encoded_label, [-1, len(table_keys)])
return outputs
The preprocessing function takes raw data as an input and produces preprocessed data which we can feed into the model during either training or inference phases.
Transformation Pipeline
It is time to design a complete Transformation Pipeline with Apache beam and TFX. Our pipeline will take files from the disk and save transformed train (train_data_file) and test (test_data_file) datasets to the filesystem (working_dir). Together with the transformed data TFX, Transform will save transformation graph (that later will be used to transform data while serving the model) and metadata.
def transform_data(train_data_file, test_data_file, working_dir):
#initialize Beam pipeline
#this will prepare and run pipeline at the end of the block
with beam.Pipeline() as pipeline:
#initiate TFX Transform pipeline
#we need TFX Transform to create transformation graph and metadata that will be saved in working_dir
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
#initiate reader that will read data from file
#in this case we could use CsvTFXIO or BeamRecordCsvTFXIO
#the difference is that BeamRecordCsvTFXIOaccepts a PCollection[bytes]
#because we need to fix spaces after comas in train data when we load it
csv_tfxio = tfxio.BeamRecordCsvTFXIO( physical_format='text',
column_names=ORDERED_CSV_COLUMNS,
schema=SCHEMA)
#start working with the train dataset
#read data from train file
#fix extra spaces
#and parse data to TFT inputs
raw_data = (
pipeline
| 'ReadTrainData' >> beam.io.ReadFromText( train_data_file, coder=beam.coders.BytesCoder())
| 'FixExtraSpacesTrainData' >> beam.Map( lambda row: row.replace(b', ', b',') )
| 'DecodeTrainData' >> csv_tfxio.BeamSource()
)
#gather dataset and data schema to tuple
#we need schema to interpret dataset
raw_data_metadata = (raw_data, csv_tfxio.TensorAdapterConfig())
#and now comes the main transformation
#that is done by TFX Transformer
#which is extremely efficient
#we get transformed dataset and transformation graph
transformed_dataset, transform_fn = (
raw_data_metadata | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn, output_record_batches=True
)
)
#dataset contains data and metadata
transformed_data, transformed_metadata = transformed_dataset
#at this final stage we save data to the working directory
_ = (
transformed_data
| 'EncodeTraindata' >> beam.FlatMapTuple( lambda batch, meta: RecordBatchToExamples(batch) )
| 'WriteTrainData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE) )
)
#start working with the test dataset
#read data from train file
#fix extra spaces
#and parse data to TFT inputs
raw_test_data = (
pipeline
| 'ReadTestData' >> beam.io.ReadFromText( test_data_file, coder=beam.coders.BytesCoder(), skip_header_lines=1)
| 'FixExtraSpacesTestData' >> beam.Map( lambda row: row.replace(b', ', b',') )
| 'RemoveTrailingPeriodsTestData' >> beam.Map( lambda row: row[:-1])
| 'DecodeTestData' >> csv_tfxio.BeamSource()
)
#gather dataset and data schema to tuple
#we need schema to interpret dataset
raw_test_data_metadata = (raw_test_data, csv_tfxio.TensorAdapterConfig())
#and now comes the main transformation
#that is done by TFX Transformer
#which is extremely efficient
#we get transformed dataset and transformation graph
transformed_test_dataset = (
(raw_test_data_metadata, transform_fn)
| tft_beam.TransformDataset( output_record_batches=True)
)
#dataset contains data and metadata
transformed_test_data, transformed_test_metadata = transformed_dataset
#at this final stage we save data to the working directory
_ = (
transformed_test_data
| 'EncodeTestData' >> beam.FlatMapTuple( lambda batch, meta: RecordBatchToExamples(batch) )
| 'WriteTestData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE) )
)
#we also save transformation graph to working directory
_ = (
transform_fn
| 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir)
)
It is time to run our pipeline:
import tempfile
temp = os.path.join(tempfile.gettempdir(), 'keras')
train = './adult.data'
test = './adult.test'
transform_data(train, test, temp)
Shortly after running the cell above, we will see the results saved to our temp folder:
We can see the transformed train and test data, transformation model, and metadata. We can use transformed train and test data to train our model (we will cover the training process in the following article). The transformation model will come in handy when we serve the model; we will use it to transform inference batch or single data point. As for the metadata, we can save it as historical data. And also, we can use our Schema to validate our train and inference data in the future (you can read more about data validation here – Data Ingestion and Validation in Production Machine Learning. Structured Dataset.)
Conclusion
In this article, we saw one of the ways to design and implement a scalable Production ML data transformation pipeline.
There are many others, and we will cover them in further articles. But the option we’ve learned today is very efficient, scalable, and reliable.
I would strongly encourage you to use the TFX, Apache Beam, and Apache Spark combinations should you need to build production-grade pipelines. TFX can also work with Kubeflow and Airflow, and both are wonderful orchestrators. We will cover them in further articles as well.
Thank you for working with me all the way through. I know some concepts might seem complicated, and you would probably want to read some Apache beam documentation to better understand the tool.
May the Force be with You!