Machine Learning with Large Datasets (Fall 2021)
Recitation 1
September 3rd, 2021
Special thanks to Tian Li and Prof. Heather Miller who developed some of the material covered today.
Agenda
1. Introduction to Databricks
2. Spark Basics
3. Programming Demo
Homework Setup
Programming Assignments
Written Assignments
More info on Course Website - https://10605.github.io/
To-Dos for students
Registration
Please only choose the community edition and NOT the free trial.
Login
Welcome Screen
Uploading/Importing the Homework
Creating a cluster
Installing third-party packages (Once Cluster is Created)
nose
Important notes about clusters
Interacting with Notebooks
Interacting with Notebooks
Exporting the Homework
PySpark
Spark Architecture
Execution of a Spark Program
Spark APIs
RDDs vs Dataframes
RDDs
DataFrames
Transformations vs Actions
Example: Map vs. Reduce
Narrow vs Wide Transformations
Narrow vs Wide Transformations
How can we create an RDD?
What can you do when you have an RDD?
A small cheat-sheet can be found here with some nice examples.
Interacting with RDDs
Lambda vs Regular Functions
Lambda Functions
Regular Function
Example
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
words_filter = peopleRdd.filter(lambda x: ‘i’ in x)
filtered = words_filter.collect()
What does the following code snippet do?
Example
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
words_filter = peopleRdd.filter(lambda x: ‘i’ in x)
filtered = words_filter.collect()
What does the following code snippet do?
creates the RDD.
nothing, since this is a transformation.
the DAG is executed since an action is called.
Example #2
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
peopleRdd.foreach(print)
peopleRdd.take(2)
What does the following code snippet do on the driver node?
Example #2
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
peopleRdd.foreach(print)
peopleRdd.take(2)
On the driver: Nothing. Why?
foreach is an action, which doesn't return anything. Therefore, it is eagerly executed on the executors, not the driver. Therefore, any calls to print are happening on the stdout of the worker nodes and are thus not visible in the stdout of the driver node.
What about when take is called? Where will the array of people end up?
When an action returns a result, it returns it to the driver node.
What does the following code snippet do on the driver node?
Example #3
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
newRDD = peopleRDD.foreach(print)
print(type(newRDD))
newRDD = peopleRDD.map(print)
print(type(newRDD))
newRDD.take(2)
newRDD = peopleRDD.map(lambda x: "Dr." + x)
newRDD.take(2)
What does the following code snippet do on the driver node?
Example #3
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
newRDD = peopleRDD.foreach(print)
print(type(newRDD))
newRDD = peopleRDD.map(print)
print(type(newRDD))
newRDD.take(2)
newRDD = peopleRDD.map(lambda x: "Dr." + x)
newRDD.take(2)
What does the following code snippet do on the driver node?
<class ‘NoneType’>
foreach is an action, which doesn't return anything. Therefore,
the type of the returned is NoneType.
Example #3
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
newRDD = peopleRDD.foreach(print)
print(type(newRDD))
newRDD = peopleRDD.map(print)
print(type(newRDD))
newRDD.take(2)
newRDD = peopleRDD.map(lambda x: "Dr." + x)
newRDD.take(2)
What does the following code snippet do on the driver node?
<class ‘NoneType’>
foreach is an action, which doesn't return anything. Therefore,
the type of the returned is NoneType.
<class ‘pyspark.rdd.PipelinedRDD’>
[None, None]
map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the printed value will be [None, None].
Example #3
peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])
newRDD = peopleRDD.foreach(print)
print(type(newRDD))
newRDD = peopleRDD.map(print)
print(type(newRDD))
newRDD.take(2)
newRDD = peopleRDD.map(lambda x: "Dr." + x)
newRDD.take(2)
What does the following code snippet do on the driver node?
<class ‘NoneType’>
foreach is an action, which doesn't return anything. Therefore,
the type of the returned is NoneType.
<class ‘pyspark.rdd.PipelinedRDD’>
[None, None]
map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the output will be [None, None].
[‘Dr. bob’, ‘Dr. alice’]
map is a transformation, so it returned a new RDD with “Dr.” added before each name.
Spark Execution
Spark Execution
Cache/Persistence
Reference Link - https://data-flair.training/blogs/apache-spark-rdd-persistence-caching/
FAQs/Helpful Debugging Instructions
�
Appendix
Group By Key Example - PySpark