Apache Spark for Distributed Machine Learning
Dan Zaratsian
AI/ML Architect, Gaming Solutions @ Google
d.zaratsian@gmail.com
https://github.com/zaratsian
TOPICS
Why do we need Distributed ML & Distributed Compute?
Process Large Datasets
When Speed is Required
Storage, Compute, and/or I/O Requirements
Data Redundancy and/or Scale
A few of the reasons…
Frameworks for Distribute ML & Distributed Compute
TORCH.DISTRIBUTED
TF.DISTRIBUTE.STRATEGY
TFX
Page 5
© Hortonworks Inc. 2014
Spark
Databases
gcs, s3
json, csv
…
JDBC / ODBC
Notebooks
Shell
Tableau / Looker / Power BI
REST API
Visualization / UI
Editor / Interface
Code Library
Data Source(s)
Apps
User
Page 6
© Hortonworks Inc. 2014
Starting a Spark Session
Apache Zeppelin
./bin/spark-shell
./bin/pyspark
./bin/sparkr
./bin/spark-sql
./bin/spark-submit
./bin/spark-submit --master yarn --deploy-mode cluster --executor-memory 20G --num-executors 50 /path/to/my_pyspark.py
Page 7
© Hortonworks Inc. 2014
What happens when you submit a Spark Job
Worker Node
Executor
Task
Cache
Task
Worker Node
Executor
Task
Cache
Task
Client Node
Driver Program
Spark Context
Resource Manager
(YARN)
Submit Spark Job
in Yarn-Client Mode
Master:
Page 8
© Hortonworks Inc. 2014
Spark Machine Learning
Vectors
Dense Vector is backed by a double array representing its values
Sparse Vector is backed by two parallel arrays (used when many values are zero)
Example: (1.0, 0.0, 3.0)
Dense: Vectors.dense(1.0, 0.0, 3.0)
Sparse: Vectors.sparse(3, [(0, 2), (1.0,3.0)])
Dense Vectors:
Sparse Vectors:
Spark ML Pipelines
Make it easier to combine multiple algorithms into a single workflow.
Sequence of stages, and each stage is either a Transformer or an Estimator.
Core Components of a Pipeline:
Spark Machine Learning
Training
Example:
Serving
{ “text”: “Mountain biking is awesome!”}
Historical Training Dataset(s)
Train/Tune Model
Trained �Model Object
Model Evaluation
{“sentiment”: 0.96}
Preprocessing and Feature Engineering
Data Exploration and QA
Deployed Model
Client
(Live Data)
{ “text”: “Mountain biking is awesome!”}
array(
[0.41,0.59,...,0.22,0.89],
…
[0.20,0.45,...,0.25,0.34],
)
Pass data into Model
Response with prediction
Model was trained on arrays, tensors, vectors
The deployed model expects the same.
Spark ML Pipeline
In machine learning, it is common to run a sequence of algorithms. We can bundle these algorithms and data processing steps into a Spark ML Pipeline.
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. The stages are specified as an ordered array (or DAG).
It's often a best practice to save a model or a pipeline to disk for later use.
Reference Code (in Colab Notebook): Spark ML Pipeline Example
Spark Pipeline Demo
Spark ML Feature Engineering
Extracting, transforming and selecting features:
Spark VectorAssembler
Transformer that combines a list of columns into a single vector column.
Useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models.
VectorAssembler(inputCols=["hour", "mobile", "userFeatures"], outputCol="features")
ML Algorithms
Classification Techniques
Regression Techniques
Algorithms
Clustering Techniques
Collaborative Filtering / Recommendation Techniques
Model Selection and Hyperparameter Tuning
Techniques:
Best Practices: Dev
Page 22
© Hortonworks Inc. 2014
Why is my code slow…
Memory Intensive:
Require a large amount of RAM to hold large datasets, intermediate results, or complex data structures in memory for quick access.
CPU Intensive:
Require significant computational power to perform calculations, simulations, or process data.
I/O Intensive:
Require significant read/write operations to disk or network, often being bottlenecked by the speed of these operations
Page 23
© Hortonworks Inc. 2014
Best Practices: Tuning
Page 24
© Hortonworks Inc. 2014
Demos
Demos
Demos
Spark Environments
Colab Notebooks or local notebooks with custom installation
(Dev)
Docker
(Dev)
Cloud Environments
(Dev and Prod)
docker run -it -p 8888:8888 --name notebook jupyter/all-spark-notebook
Google Dataproc ● Apache Spark on Amazon EMR ● Apache Spark on Azure Databricks
Spark Environments
Assignment
Advanced Big Data
Dan Zaratsian
AI/ML Architect, Gaming Solutions @ Google
d.zaratsian@gmail.com
https://github.com/zaratsian
Thanks!
Misc Slides - Appendix
Apache Spark - Example
mm_season = spark.read.load(
"hdfs://sandbox.hortonworks.com:8020/tmp/marchmadness/SeasonResults/SeasonResults.csv",
format="csv",
header=True)
mm_season.show()
mm_season.count()
mm_season.dtypes
mm_season.createOrReplaceTempView('mm_season_sql')
spark.sql('''SELECT * FROM mm_season_sql ''').show(10)
Apache Spark - Recommendations
1. Caching: � • MEMORY_ONLY: (default/recommended) Store RDD as deserialized objects in JVM Heap � • MEMORY_ONLY_SER: (2nd option) Store RDD as serialized Kryo objects. Trade CPU time for memory savings � • MEMORY_AND_DISK: Spill to disk if can’t fit in memory � • MEMORY_AND_DISK_SER: Spill serialized RDD to disk if it can’t fit in memory ��2. Data Serialization Performance: � • Reduces data size, so less data transfer � • Use Kyro over Java (Kyro is up to 10x faster) � • conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) � • sparkConf.set("spark.sql.tungsten.enabled", "true") � • sparkConf.set("spark.io.compression.codec", "snappy") � • sparkConf.set("spark.rdd.compress", "true") ��3. Memory and Garbage Collection Tuning: � • GC is a problem for Spark apps which churn RDDs � • Measure time spent in GC by logging: -verbose:gc –XX:+PrintGCDetails –XX:+PrintGCTimeStamps � • If there’s excessive GC per task, use the MEMORY_ONLY_SER storage level to limit just one object per RDD � partition (one byte array) and reduce the spark.storage.memoryFraction value from 0.6 to 0.5 or less. ��4. Set Correct Level of Parallelism: � • set spark.default.parallelism = 2-3 tasks per CPU core in your cluster � • Normally 3 - 6 executors per node is a reasonable, depends on the CPU cores and memory size per executor � • sparkConf.set("spark.cores.max", "4") � • 5 or less cores per executor (per node) (ie. 24-core node could run 24/4cores = 6 executors)
Best Practices: Tuning
Page 35
© Hortonworks Inc. 2014
Configuring Executors
Page 36
© Hortonworks Inc. 2014
Which Storage Level to Choose?
Page 37
© Hortonworks Inc. 2014
When things go wrong…
Page 38
© Hortonworks Inc. 2014