In the previous article (Production ML: Data Engineering Pipeline – E-commerce Example. Part 1.), we’ve designed a solution to help us migrate our company data from the old Customer Data Platform (CDP) to the new one.
Data has a complicated structure; therefore, we must implement data transformations.
In this article, we will start the implementation. We will begin with the batch-processing solution.
Prerequisites
I assume that you understand the services below. If not, please read the guidelines I provided in the link below.
To be able to follow what I am doing, you will need a few things:
- Data: we will use the BigQuery sample dataset for Google Analytics 4 e-commerce web implementation. This dataset is realistic and should allow us to build a nice transformation pipeline.
- Databricks account. We will use Databricks to build a highly scalable transformation ETL pipeline. We could have used GC Dataproc or AWS EMR, for example. But here, on the one hand, I did not want to use too many tools (because I did not want to create too many IAM settings just for the demo purpose). On the other hand, I do not want to use just one cloud because you might think they pay me (which, unfortunately, they are not). Use the Get Started manual to set up your account and workplace on Databricks. Then, try Running your first ETL pipeline on Databricks. These two manuals will bring you up to speed with Databricks to the extent that you can understand this article.
- Google Cloud subscription. We will use Big Query, Cloud Functions, and Pub/Sub from GCP. Again, it could have been AWS Lambda and AWS AmazonQ. But because data is in the Big Query and I am lazy, we must stick to GCP. If you are new to the Google Cloud, I strongly suggest you take a break and study it a bit. At least read this Getting Started guide, this IAM guideline, the Big Query Getting Started guide, Creating the Google Cloud Function guide, and this nice Getting Started with Pub/Sub quest from Google.
I hope you are still reading this article! I promise it will not be too complicated.
Please prepare the prerequisites from the list above. Also, explore the data a bit, try running some queries, and get an understanding of the data structure.
Extract
At the first stage of our pipeline, we retrieve the e-commerce data from a temporary Big Query table. Data is coming to this table from GA4 by means of export. It is super simple to trigger such an export in GA4. You can read the description here – Export from GA4 to Big Query.
This is where we first face complications related to the large volumes of data. As we are processing huge volumes of data, we cannot simply send API requests from the ETL pipeline and get back the data.
We must use data retrieval logic embedded into the data retrieval layer. This layer will form and trigger export from Big Query to Cloud Storage. Such exports happen on Google Cloud premises, a more secure way of ingesting data. Also, further down the road, our pipeline will work with Cloud Storage, and the pipeline will have permissions to access temporary buckets in Cloud Storage.
The data retrieval layer will also authorize requests and ensure the requestor can access the requested data. In our simple case, this seems not to be super important, but in production, ETL can have many possible requestors and use cases, and security mechanisms embedded in the data retrieval layer are very important.
We will use a fairly simple Cloud Function (CF) to form our data retrieval layer. This function will be triggered via API request and receive three parameters:
- authentication key,
- the period for which we are requesting the data,
- limit of the number of events to retrieve.
The CF will form a query to the Big Query table and request export to the Cloud Storage bucket.
Let’s now create the API-triggered 2nd gen Cloud Function and paste the following code into it:
import os
from datetime import datetime
import random
from flask import request, make_response, jsonify
from google.cloud import bigquery, storage
def save_data(request):
request_json = request.get_json(silent=True)
print(request_json)
# check if key was provided
if request_json['key']:
request_key = request_json['key']
else:
request_key = False
if request.method == 'OPTIONS':
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Origin, Content-Type, X-Auth-Token",
"Access-Control-Allow-Methods": "POST, GET, OPTIONS",
"Access-Control-Max-Age": 86400,
"Content-Type": "application/json",
"Accept": "application/json",
}
return ('', 204, headers)
env_key = os.environ.get("key", False)
# if key provided
if request_key:
# check if it is the right key
if request_key != env_key:
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Origin, Content-Type, X-Auth-Token",
"Access-Control-Allow-Methods": "POST, GET, OPTIONS",
"Access-Control-Max-Age": 86400,
"Content-Type": "application/json",
"Accept": "application/json",
}
# and if wrong retunrn corresponding message
response = make_response(
jsonify(
{"res": 'Invalid key'}
),
403,
)
response.headers = headers
return response
# if the key is ok move further to the retrieval phase
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Origin, Content-Type, X-Auth-Token",
"Access-Control-Allow-Methods": "POST, GET, OPTIONS",
"Access-Control-Max-Age": 86400,
"Content-Type": "application/json",
"Accept": "application/json",
}
# implement sanitization of the incoming request parameters
request_date = request_json['yyyymm']
limit = request_json['limit']
process_name = f'events_time_{request_date}_limit_{limit}'
# initialize BQ client
bq_client = bigquery.Client()
bucket_name = 'test_dataproc_bucket'
# form the query
query = f'SELECT * FROM `<your_project>.<your_temporary_export_dataset>.events_{request_date}*` LIMIT {limit}'
# get data to the dataframe
df = bq_client.query(query).to_dataframe()
# initialize storage client
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
filename = f'dataset_{process_name}.jsonl'
blob = bucket.blob(filename)
# save data to the bucket
print('started saving df to bucket')
if bucket.exists():
blob = bucket.blob(filename)
print('bucket exists')
try:
print('uploading dataframe to jsonl file in the bucket')
blob.upload_from_string(df.to_json(orient = 'records', lines = True, force_ascii = False), 'jsonl')
print('dataset was created')
resultsSend = 'OK'
except Exception as e:
if hasattr(e, "response"):
status_code = e.response.status_code
status_desc = e.response.json()['error']['message']
print('something went wrong: ', status_code, ". ", status_desc)
else:
print('something went wrong: ', e)
print("Dataset was not created")
resultsSend = 'NOT_OK'
else:
print('creating new bucket')
new_bucket = storage_client.create_bucket(bucket, location = REGION)
blob = new_bucket.blob(filename)
try:
print('uploading dataframe to jsonl file in the bucket')
blob.upload_from_string(df.to_json(orient = 'records', lines = True, force_ascii = False), 'jsonl')
print('dataset was created')
resultsSend = 'OK'
except Exception as e:
if hasattr(e, "response"):
status_code = e.response.status_code
status_desc = e.response.json()['error']['message']
print('something went wrong: ', status_code, ". ", status_desc)
else:
print('something went wrong: ', e)
print("Dataset was not created")
resultsSend = 'NOT_OK'
response = make_response(
jsonify(
{"res": resultsSend}
),
200,
)
response.headers = headers
return response
else:
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Origin, Content-Type, X-Auth-Token",
"Access-Control-Allow-Methods": "POST, GET, OPTIONS",
"Access-Control-Max-Age": 86400,
"Content-Type": "application/json",
"Accept": "application/json",
}
response = make_response(
jsonify(
{"res": 'Invalid key'}
),
403,
)
response.headers = headers
return response
There are a few packages that this CF needs to run. Add the code below to your CF’s ‘requirements.txt’ file. ‘There are a few packages that this CF needs to run. Add the code below to your CF’s ‘requirements.txt’ file.
google-cloud-bigquery
google-cloud-storage
pandas
db-dtypes
Try testing the Cloud Function and see if it saves the data correctly into the Cloud Storage bucket.
And now, nothing can stop us from building our cool ETL pipeline.
Transform
If you did not set up your workplace in Databricks, now is a good time to do so. Please open the Databricks control panel, set up your workplace, and create a notebook.
We will start writing the code below. This code assumes you have added a few environment variables when you launch your cluster:
- AUTH_KEY: for the authentication of the request to the data layer,
- SERVICE_ACCOUNT: the service account your pipeline will use to access Big Query and Cloud Storage. Here is a nice Google Cloud Service account guideline from AWS.
Now, we will start crafting the pipeline.
First of all, we will request the data from our data layer:
import json
import requests
import os
time = "202101" #set this parameter
limit = "100000" #set this parameter
api = 'https://us-central1-alexostrovskyycom.cloudfunctions.net/<name_of_your_cloud_function>'
request = {
"key": os.getenv('AUTH_KEY'),
"yyyymm": time,
"limit": limit
}
process_name = f'events_time_{time}_limit_{limit}'
response = requests.post(api, json = request)
response.status_code
Once we receive a status code 200 from the data layer, we can load data into our cluster storage and later memory.
SERVICE_ACCOUNT_CREDENTIALS = os.getenv('SERVICE_ACCOUNT')
REGION = "us-central1" # change if you have region specific Big Query dataset
from google.oauth2 import service_account
from google.cloud import bigquery, storage
import json
bucket_name = '<your_temp_bucket_name>'
filename = f'dataset_{process_name}.jsonl'
filename_save = f'dataset_saved_{process_name}.jsonl'
credentials_in = service_account.Credentials.from_service_account_info(SERVICE_ACCOUNT_CREDENTIALS)
storage_client = storage.Client(credentials = credentials_in)
# init Cloud Storage client
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(filename)
# download the data
blob.download_to_filename(filename_save)
Load data into the list:
file1 = open(f'dataset_saved_{process_name}.jsonl', 'r')
Lines = file1.readlines()
lines = []
for line in Lines:
lines.append(line)
Check if you have all the records:
len(lines)
We will load data into the memory and transform it into the correct schema.
Functions performing transformation:
import pandas as pd
from pandas import json_normalize
import json
# function performing the recursive flattening operation
def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '_')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '_')
i += 1
else:
out[str(name[:-1])] = str(x)
flatten(y)
return out
# function dealing with the additional complexity of data schema in GA4
# which is the fact that part of the properties of the event are stored under one keay and their values are stored under another key
def get_event_json(json):
# print('get_event_json(json)')
# print(json)
out = {}
for param in json:
if json[param] != 'null':
if 'key' in param:
if 'event_params' in param:
value_str = param[:14] + '_value_string_value'
value_int = param[:14] + '_value_int_value'
if json[value_str] == 'null':
out[json[param]] = json[value_int]
else:
out[json[param]] = json[value_str]
else:
value_str = param[:17] + '_value_string_value'
value_int = param[:17] + '_value_int_value'
if json[value_str] == 'null':
out[json[param]] = json[value_int]
else:
out[json[param]] = json[value_str]
elif 'event_params' not in param and 'user_properties' not in param :
out[param] = json[param]
return out
Initialize pySpark app and context:
import os
import sys
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "15g") \
.appName('data_processing_app') \
.getOrCreate()
sc = spark.sparkContext
spark
Create and run the transformation pipeline:
import json
# main processing function
def get_flat_event_spark(event):
flat_object = flatten_json(event)
return get_event_json(flat_object)
# creating the pipeline
df_spark = sc.parallelize(lines).map(lambda event: get_flat_event_spark(json.loads(event)))
# running the pipeline and collecting the results
processed_list = df_spark.collect()
Check if all of the data points survived the transformation:
len(processed_list)
Usually, it should take 4-7 minutes to process 100,000 events. However, it depends on the environment and other factors.
Once data is processed, we can save it to the temporary Cloud Storage bucket, from where it will be imported into the Digital Marketing Data Warehouse.
Load
Let’s import the necessary packages:
from google.cloud import bigquery, storage
from google.oauth2 import service_account
SERVICE_ACCOUNT_CREDENTIALS = os.getenv('SERVICE_ACCOUNT')
Save processed data to the file:
with open(f'load_to_bq_{process_name}.jsonl', 'w') as f:
for line in processed_list:
f.write(json.dumps(line) + '\n')
Load local file to the temporary Cloud Storage bucket:
bucket_name = '<your_temp_bucket_name>'
credentials_save_jsonl = service_account.Credentials.from_service_account_info(SERVICE_ACCOUNT_CREDENTIALS)
save_storage_client = storage.Client(credentials = credentials_save_jsonl)
bucket = storage_client.bucket(bucket_name)
filename = f'load_to_bq_{process_name}.jsonl'
blob = bucket.blob(filename)
generation_match_precondition = 0
print('started saving data to bucket')
if bucket.exists():
blob = bucket.blob(filename)
print('bucket exists')
try:
print('uploading data to jsonl file in the bucket')
blob.upload_from_filename(filename, if_generation_match = generation_match_precondition)
print('uploaded')
except Exception as e:
if hasattr(e, "response"):
status_code = e.response.status_code
status_desc = e.response.json()['error']['message']
print('something went wrong: ', status_code, ". ", status_desc)
else:
print('something went wrong: ', e)
print("Data was not uploaded")
else:
print('creating new bucket')
new_bucket = storage_client.create_bucket(bucket, location = REGION)
blob = new_bucket.blob(filename)
try:
print('uploading data to jsonl file in the bucket')
blob.upload_from_filename(filename, if_generation_match = generation_match_precondition)
print('data was uploaded')
except Exception as e:
if hasattr(e, "response"):
status_code = e.response.status_code
status_desc = e.response.json()['error']['message']
print('something went wrong: ', status_code, ". ", status_desc)
else:
print('something went wrong: ', e)
print("Data was not uploaded")
And finally, load data into the Big Query Data Warehouse:
credentials_save_bq = service_account.Credentials.from_service_account_info(SERVICE_ACCOUNT_CREDENTIALS)
save_bq_client = bigquery.Client(credentials = credentials_save_bq)
table_id = "<your_project>.<your_dataset>.<your_table>"
job_config = bigquery.LoadJobConfig(
autodetect = True, source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
uri = f"gs://{bucket_name}/{filename}"
load_job = save_bq_client.load_table_from_uri(
uri, table_id, job_config = job_config
)
load_job.result()
destination_table = save_bq_client.get_table(table_id)
print("Loaded {} rows to BQ.".format(destination_table.num_rows))
Conclusion
I hope this was not too complicated for you. I understand that this time, my explanations were not too detailed. This is because I do not want to bore to death people who know and were doing some elements of it.
If you did not understand something, please try to read the guidelines in the links I provided.
Also, this pipeline is not ideal for production and needs some improvements before it can be used in production.
Some of the immediate required improvements that I can think of are:
- Events deduplication on the level of the streaming pipeline. Currently, when the events are coming into the streaming pipeline, there is no check if such an event (combination of event type and timestamp has already occurred in the database) is not happening. In order for the analytics to work properly, it is important to have an event deduplication mechanism.
- Session analysis. In e-commerce analytics, so-called ‘session analysis’ plays a very important role. A session is a period of time during which a user interacts with the application. Specific sessions can then be an indicative unit to which profits and other metrics can be attributed during the analysis. So, ideally, each event can have a property that will help attribute specific events to the specific user session.
- Batch pagination. It is useful to have ‘paginated ’ processing during the historical event’s batch processing. In such cases, we do not have to wait until the entire batch is retrieved and processed but rather process it in batches. This way, it is easier to control the retrieval process and fault-tolerance logic.
- Failure management and retrieval logic. At the moment, no fail management has been implemented in either batch or streaming processing. Such a logic is vital for production-grade pipelines.
- Security. At the moment, the security is only limited to the native GC security controls (encryption in transition and at rest) and API token we are using to authenticate requests in the stream processing part of the solution. In the future, these security controls must also include network controls (CF level authentication, IP whitelisting, etc.) and other measures.
- Checkpoints and artifacts. During the pipeline execution, it is very useful to store checkpoints and processing artifacts (metadata, versioning, etc.). These artifacts are helpful for the testing and analysis of pipeline work and for restoring lost corrupted data.