Deploy Machine Learning Models with Spark

The following two tabs change content below.

Vibhor Nigam

Latest posts by Vibhor Nigam (see all)

        For last few months, I have been working on a side project of mine to develop machine learning application on streaming data. It was a great learning experience with numerous challenges and lots of learning, some of which I have tried to share in here. This post explains how to deploy machine learning models with spark and covers all 3 necessary areas of a successful production application: infrastructure, technology, and monitoring.

Deploy machine Learning Models with Spark Structured Streaming:

The first step for any successful application is to determine the technology stack in which it should be written, on the basis of business requirements. As a general rule of thumb, when the amount of data is huge, use Spark.

First of all, the primary benefit of using Spark is its proven ability in big data processing and, availability of inbuilt distributed machine learning libraries. The initial challenges in using spark were the use of RDD, which were in-intuitive and very different from the Dataframes, which, Data Scientists are used to working with. However, with the introduction of Dataset API in Spark2.0, it has now become easier than before, to code up a machine learning algorithm.

During my experience, I have found that working with machine learning models becomes extremely easy with proper utilization of “Pipeline” framework. What a Pipeline does is provide a structure to include all the steps required for processing & cleaning data, training a model, and then writing it out as an Object.

This object then can directly be imported to process new data and get predictions or forecases thus freeing the developer from the process of re-writing processing steps for the new data which were followed to build a model with the training data.

In the snippet below I have tried to cover how to use this API to build, save and use the models for prediction. For building and saving a model one can follow the following code structure.

Once a model is saved it can be used for prediction on streaming data easily with the following steps.

1. Read data from a Kafka topic

2. Load a saved ML model and use it for prediction

3. Save the results to s3 or other locations

In CSV format

In parquet format

Or if we want to send the results to some database or any other extensions

Implementation of these 3 steps leads to the successful deployment of “Machine Learning Models with Spark”.

In some case, however, a separate writer needs to be implemented for writing out results into a database, queue or some other format. This can be achieved by extending the ForeachWriter interface provided with spark structured streaming. A sample code for JDBC is shown below, taken from https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html

MONITORING, LOGGING, AND ALERTS

Consequently, the next step is to integrate monitoring, alerting and logging services in the application, so as, to get instantaneous alerts and keep a tab on how the application has been working. There are many tools available in the AWS stack, to avail these facilities. A couple of them which are used frequently are CloudWatch for Monitoring and Elastic Search for Logging. These can be readily integrated via there API available in different languages.

A sample monitoring dashboard will look something like this

Image courtesy: https://github.com/amazon-archives/cloudwatch-logs-subscription-consumer

INFRASTRUCTURE

Once the code is ready for deployment it is time to choose the appropriate infrastructure for deploying it. What I found to be the best infrastructure is Kafka (mainly because of its multi publisher/consumer architecture and the ability to set retention periods over different topics), and AWS EMR as the core infrastructure for running up applications

AWS EMR became the obvious choice owing to the availability of clusters with pre-installed spark and internal resource management. Ability to spin up a new cluster with full deployment in a short time is also a major plus point.

A simplified architecture diagram will look like.

Image courtesy — https://dmhnzl5mp9mj6.cloudfront.net/bigdata_awsblog/images/Spark_SQL_Ben_Image_1.PNG

TUNING A SPARK JOB

Lastly, as in any other spark job, tuning it is necessary in the case of a streaming job as well for maximum efficiency. The first step in tuning a spark job is to choose appropriate instances for the job. On performing several experiments on M4(general purpose) vs C4(computation heavy) instance types, I found M4 to be better performing primarily because of its ability to provide virtual cores as well.

Likewise, I found “dynamic allocation” property in spark also to be extremely useful in maximizing the utilization in a stable way. There are a number of other parameters as well which I found useful in tweaking performance, some of which are mentioned below:

a) — conf spark.yarn.executor.memoryOverhead=1024: The amount of memory overhead defined for the job

b) — conf spark.yarn.maxAppAttempts=4: This property defines the maximum number of attempts which will be made to submit the application. It’s quite useful for scenarios where multiple spark jobs are being submitted to a single cluster and sometimes submit jobs fail because of a lack of available resources.

c) — conf spark.task.maxFailures=8: This property sets the maximum number of times a task can fail before the spark job fails itself. The default value is 2. It’s always a good idea to keep this number higher

d) — conf spark.speculation=false: When this property is set as true, yarn automatically kills and reassigns tasks based on the time they are consuming (if yarn sees them as being stuck). In our case we didn’t find this to be contributing much in performance but is a good property to look for while processing skewed data sets

e) — conf spark.yarn.max.executor.failures=15: The maximum number of executor failures before an application fails. Always set it to a higher number.

f) — conf spark.yarn.executor.failuresValidityInterval=1h: Defines the time interval for the validity of executor failures. Combining with above property basically in hour maximum 15 executors can fail before the job dies.

g) — driver-memory 10g: Provide sufficiently high driver memory so as to not fail in case of a burst of messages are to be processed.

I hope this material proves out to be useful to people who are starting out with structured streaming. It will be a pleasure to contribute back to the open source community through which I have learned a lot.

For a much detailed technical overview please do visit https://spark.apache.org/docs/2.0.0/structured-streaming-programming-guide.html


If you enjoyed this piece, please leave a comment and let us know about it.  You can also follow me on Twitter, Medium or find me on LinkedIn.

Share Button

Leave a Reply

Your email address will not be published. Required fields are marked *