Distributed Machine Learning Model Training with Spark (PySpark)
GitHub repo: https://github.com/data-max-hq/pyspark-3-ways
What is Spark?
Apache Spark was designed to function as a simple API for distributed data processing in general-purpose programming languages. It enables tasks that otherwise would require thousands of lines of code to programm to be reduced to dozens.
What is PySpark?
PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.
In this blog post we are going to show how to:
For this post, we have chosen to train a model based on KKBox's Churn Prediction Challenge.
The goal of this challenge is to predict whether a user will churn after his/her subscription expires. Specifically, we want to forecast if a user makes a new service subscription transaction within 30 days after the current membership expiration date.
We won't get into too many details explaining this challenge but feel free to check out Kaggle for more.
Creating the PySpark Job
We already have daily user logs describing the listening behaviors of a user within a month. Now we simply need to aggregate this data so that we have a single record with the sum of all fields for each user.
num_25: # of songs played less than 25% of the song length num_50: # of songs played between 25% to 50% of the song length num_75: # of songs played between 50% to 75% of of the song length num_985: # of songs played between 75% to 98.5% of the song length num_100: # of songs played over 98.5% of the song length num_unq: # of unique songs played total_secs: total seconds played
Some Feature Engineering
We are also adding another additional feature which is the number of all songs played in a month. The creation of new input features from existing features is also referred to as feature engineering.
Then we build a simple LogisticRegression model, evaluate it, and finally track the accuracy.
Run PySpark locally on Minikube
Set up the local cluster
You can take a look at the snippet below to see how it is done.
First, we create a minikube cluster and mount our repo. Then we build our custom PySpark image to include numpy using the Dockerfile below.
And lastly, we install spark operator using helm.
Apply the PySpark job
kubectl apply -f job.yaml
Port-forward Spark UI
kubectl port-forward pyspark-job-driver 4040:4040
Check out logs for model accuracy
kubectl -n=default logs -f pyspark-job-driver | grep accuracy
Run PySpark on Google Dataproc
Dataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks.
Dataproc is a great tool to modernize your open-source data processing systems. It can be set up using Cloud Engine VMs or Google Kubernetes Engine (GKE) for containerized jobs to provide job portability and isolation.
In this section, we are going to cover:
Dataproc cluster types and how to set Dataproc up
How to submit a PySpark job to Dataproc
Dataproc has three cluster types Standard, Single-Node, and High Availability. The one we are using in this article is the Standard one which consists of 1 master and N worker nodes.
Set up Dataproc
Creating the Dataproc cluster is very easy, first enable Dataproc then create the cluster on Cloud Engine VMs with the default options.
When creating the cluster make sure to include Anaconda as our job requires numpy. If you get an error that Anaconda is not supported try choosing an older version of Spark offered by Dataproc.
You can find the job.py file on our Github repo. Submitting the PySpark job is very straightforward.
Create a GCS bucket
Replace <bucket-name> with the name of your bucket
Upload the Github repo including the dataset needed for the model training
Submit your job by selecting PySpark as Job type and the gsutil URI of job.py located in your GCS bucket as the main python file.
Run PySpark locally with Airflow
You can run spark jobs using the built-in SparkSubmitOperator offered by Airflow. We have already created a dag in our repository so all you have to do to run it is:
pip install -r requirements_airflow.txt
AIRFLOW_HOME=$(pwd) airflow standalone
Create a new connection
If you want to connect to local spark cluster create a connection with id spark of type spark and hostname local[*]
(Optional) Remove example DAGs (optional)
In airflow.cfg file, change load_examples to False
Log in to Airflow UI
password: <password shown in terminal or standalone_admin_password.txt>
There is no magic out-of-the-box solution to run PySpark jobs, or any data processing pipeline for that matter. And that is exactly what is article is meant to show you.
For any questions, feel free to reach out to us at email@example.com. We would love to hear from you.