Search
  • Kejdi Tako

Distributed Machine Learning Model Training with Spark (PySpark)



GitHub repo: https://github.com/data-max-hq/pyspark-3-ways


What is Spark?

Apache Spark was designed to function as a simple API for distributed data processing in general-purpose programming languages. It enables tasks that otherwise would require thousands of lines of code to programm to be reduced to dozens.


What is PySpark?

PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.


In this blog post we are going to show how to:


For this post, we have chosen to train a model based on KKBox's Churn Prediction Challenge.


The goal of this challenge is to predict whether a user will churn after his/her subscription expires. Specifically, we want to forecast if a user makes a new service subscription transaction within 30 days after the current membership expiration date.


We won't get into too many details explaining this challenge but feel free to check out Kaggle for more.


Creating the PySpark Job

Data aggregation

We already have daily user logs describing the listening behaviors of a user within a month. Now we simply need to aggregate this data so that we have a single record with the sum of all fields for each user.


Features

num_25: # of songs played less than 25% of the song length
num_50: # of songs played between 25% to 50% of the song length
num_75: # of songs played between 50% to 75% of of the song length
num_985: # of songs played between 75% to 98.5% of the song length
num_100: # of songs played over 98.5% of the song length
num_unq: # of unique songs played
total_secs: total seconds played

Some Feature Engineering

We are also adding another additional feature which is the number of all songs played in a month. The creation of new input features from existing features is also referred to as feature engineering.


Then we build a simple LogisticRegression model, evaluate it, and finally track the accuracy.


Run PySpark locally on Minikube


Prerequisites

  • Docker

  • Minikube

Set up the local cluster

make all

You can take a look at the snippet below to see how it is done.

First, we create a minikube cluster and mount our repo. Then we build our custom PySpark image to include numpy using the Dockerfile below.


And lastly, we install spark operator using helm.


Apply the PySpark job

kubectl apply -f job.yaml

Port-forward Spark UI

kubectl port-forward pyspark-job-driver 4040:4040

Check out logs for model accuracy

kubectl -n=default logs -f pyspark-job-driver | grep accuracy

Run PySpark on Google Dataproc


Dataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks.


Dataproc is a great tool to modernize your open-source data processing systems. It can be set up using Cloud Engine VMs or Google Kubernetes Engine (GKE) for containerized jobs to provide job portability and isolation.


In this section, we are going to cover:

  1. Dataproc cluster types and how to set Dataproc up

  2. How to submit a PySpark job to Dataproc

Cluster types


Dataproc has three cluster types Standard, Single-Node, and High Availability. The one we are using in this article is the Standard one which consists of 1 master and N worker nodes.


Set up Dataproc


Creating the Dataproc cluster is very easy, first enable Dataproc then create the cluster on Cloud Engine VMs with the default options.


When creating the cluster make sure to include Anaconda as our job requires numpy. If you get an error that Anaconda is not supported try choosing an older version of Spark offered by Dataproc.


Submit Job

You can find the job.py file on our Github repo. Submitting the PySpark job is very straightforward.

  1. Create a GCS bucket

  2. Replace <bucket-name> with the name of your bucket

  3. Upload the Github repo including the dataset needed for the model training

  4. Submit your job by selecting PySpark as Job type and the gsutil URI of job.py located in your GCS bucket as the main python file.

Run PySpark locally with Airflow


Apache Airflow is an open-source workflow management platform for data engineering pipelines.


You can run spark jobs using the built-in SparkSubmitOperator offered by Airflow. We have already created a dag in our repository so all you have to do to run it is:


Install requirements

pip install -r requirements_airflow.txt

Run Airflow

AIRFLOW_HOME=$(pwd) airflow standalone

Create a new connection

If you want to connect to local spark cluster create a connection with id spark of type spark and hostname local[*]


(Optional) Remove example DAGs (optional)

In airflow.cfg file, change load_examples to False


Log in to Airflow UI

URL: http://localhost:8080

username: admin

password: <password shown in terminal or standalone_admin_password.txt>


Conclusions

There is no magic out-of-the-box solution to run PySpark jobs, or any data processing pipeline for that matter. And that is exactly what is article is meant to show you.


For any questions, feel free to reach out to us at hello@data-max.io. We would love to hear from you.