1 of 42

Machine Learning with Large Datasets (Fall 2021)

Recitation 1

September 3rd, 2021

Special thanks to Tian Li and Prof. Heather Miller who developed some of the material covered today.

2 of 42

Agenda

1. Introduction to Databricks

  • Registration on Databricks (Community Edition)
  • Uploading Notebooks
  • Creation of Cluster

2. Spark Basics

  • Spark Architecture
  • Execution of Spark Program
  • Spark APIs
  • Lambda Functions
  • Examples
  • Cache/Persistence

3. Programming Demo

3 of 42

Homework Setup

Programming Assignments

  • 6 programming assignments - 4 using PySpark, 2 using Tensorflow (Subject to Change)
  • PySpark assignments can be completed on Databricks (Community).
  • More details about the tensorflow assignments will be shared later.
  • For autograding, submit assignments on Gradescope.
  • Detailed instructions on how to submit assignment for grading will be in the writeup.

Written Assignments

  • Need to be directly submitted to Gradescope.

More info on Course Website - https://10605.github.io/

4 of 42

To-Dos for students

  • Register for a free community version of Databricks.
  • Download the assignment and import the Jupyter Notebooks on Databricks.
  • Configure the environment according to the instructions in the writeup.
    • Creating a cluster.
    • Installing a third-party package.
  • After all local tests pass, hand-in the assignment on Gradescope for grading.
    • Please note that you get points if the tests on the grader pass.
    • The local tests exists to help you develop and test your code.

5 of 42

Registration

Please only choose the community edition and NOT the free trial.

6 of 42

Login

  • Login to the community edition at

https://community.cloud.databricks.com/login.html

7 of 42

Welcome Screen

8 of 42

Uploading/Importing the Homework

9 of 42

Creating a cluster

10 of 42

Installing third-party packages (Once Cluster is Created)

nose

11 of 42

Important notes about clusters

  • Please make sure that the following versions are used:
    • Spark: 3.1.2
    • Scala: 2.12
    • Runtime: 8.4
  • For all coding, please use Python3 syntax.
  • Launching a cluster can take some time (a few minutes).
  • The cluster status should be “active” before running any cells.
  • Community edition only allows for a single node cluster which should be sufficient for completion of the homeworks.
  • Community edition clusters will automatically terminate after an idle period of two hours.

12 of 42

Interacting with Notebooks

  • Import notebook������

13 of 42

Interacting with Notebooks

  • Attach notebook to the cluster������

14 of 42

Exporting the Homework

  • After completion of assignment, export the notebook as an iPython Notebook and submit to Gradescope for grading.

15 of 42

16 of 42

PySpark

  • We are using the Python programming interface to Spark (pySpark)
  • Where can I run PySpark code?
    • Locally on your laptop!
    • Databricks
    • Zeppelin Notebooks
    • many more …

17 of 42

Spark Architecture

  • Driver program: This is the node you’re interacting with when you’re writing Spark programs. Creates Spark Context to establish the connection to Spark Execution Environment

  • Cluster Manager: Allocates resources across cluster, manages scheduling. e.g., YARN/Mesos�
  • Worker program: The program that runs on the worker nodes.�
  • Executor: What actually does the work/tasks on each cluster node.

18 of 42

Execution of a Spark Program

  • The driver program runs the Spark application, which creates a SparkContext upon start-up.
  • The SparkContext connects to a cluster manager (e.g.,Mesos/YARN) which allocates resources.
  • Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
  • Next, driver program sends your application code to these executors.
  • Finally, SparkContext sends tasks for the executors to run.

19 of 42

Spark APIs

  1. RDD (Resilient Distributed Datasets): Think distributed list.
  2. DataFrames: SQL-like structured datasets with query operations.
  3. Datasets: A mixture of RDDs and DataFrames in terms of the operations that are available on them.

  • The homeworks in this class will focus on using the RDD and the Dataframe APIs.
  • The dataframe API has lots of parallels from Pandas Dataframes.

20 of 42

RDDs vs Dataframes

RDDs

  • RDDs are immutable distributed collection of elements of your data that can be stored in memory or disk across a cluster of machines.
  • The data is partitioned across machines in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.
  • RDDs are fault tolerant as they track data lineage information to rebuild lost data automatically on failure.

DataFrames

  • Like an RDD, a DataFrame is an immutable distributed collection of data.
  • Unlike an RDD, data is organized into named columns, like a table in a relational database.
  • Dataframes allow for a higher-level abstraction by imposing a structure onto a distributed collection of data.

21 of 42

Transformations vs Actions

  • Transformations: Return new RDDs as a result.
    • They are lazy, their result RDD is not immediately computed.��
  • Actions: Compute a result based on an RDD, and either returned or saved to an external storage system (e.g., HDFS)
    • They are eager, their result is immediately computed.

22 of 42

Example: Map vs. Reduce

23 of 42

Narrow vs Wide Transformations

  • Each partition of the parentRDD is used by at most one partition of the childRDD.
  • Fast, No Data Shuffling

  • Multiple childRDD partitions may depend on a single parent RDD partition.
  • Slow, Data Shuffling Required

24 of 42

Narrow vs Wide Transformations

25 of 42

How can we create an RDD?

  • From a SparkContext (or SparkSession)

  • Referencing to the external data file stored

  • Transforming an existing RDD

What can you do when you have an RDD?

A small cheat-sheet can be found here with some nice examples.

26 of 42

Interacting with RDDs

27 of 42

Lambda vs Regular Functions

Lambda Functions

  • Evaluates only a single expression
  • No name associated

Regular Function

  • Can have multiple expressions
  • Must have name associated

28 of 42

Example

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

words_filter = peopleRdd.filter(lambda x: ‘i’ in x)

filtered = words_filter.collect()

What does the following code snippet do?

29 of 42

Example

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

words_filter = peopleRdd.filter(lambda x: ‘i’ in x)

filtered = words_filter.collect()

What does the following code snippet do?

creates the RDD.

nothing, since this is a transformation.

the DAG is executed since an action is called.

30 of 42

Example #2

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

peopleRdd.foreach(print)

peopleRdd.take(2)

What does the following code snippet do on the driver node?

31 of 42

Example #2

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

peopleRdd.foreach(print)

peopleRdd.take(2)

On the driver: Nothing. Why?

foreach is an action, which doesn't return anything. Therefore, it is eagerly executed on the executors, not the driver. Therefore, any calls to print are happening on the stdout of the worker nodes and are thus not visible in the stdout of the driver node.

What about when take is called? Where will the array of people end up?

When an action returns a result, it returns it to the driver node.

What does the following code snippet do on the driver node?

32 of 42

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

33 of 42

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action, which doesn't return anything. Therefore,

the type of the returned is NoneType.

34 of 42

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action, which doesn't return anything. Therefore,

the type of the returned is NoneType.

<class ‘pyspark.rdd.PipelinedRDD’>

[None, None]

map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the printed value will be [None, None].

35 of 42

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action, which doesn't return anything. Therefore,

the type of the returned is NoneType.

<class ‘pyspark.rdd.PipelinedRDD’>

[None, None]

map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the output will be [None, None].

[‘Dr. bob’, ‘Dr. alice’]

map is a transformation, so it returned a new RDD with “Dr.” added before each name.

36 of 42

Spark Execution

  • Spark uses lazy evaluation!
    • Lazy evaluation means nothing executes. Spark saves recipe for transforming source.
    • Results are not computed right away – instead, Spark remembers set of transformations applied to base data set.
    • The way Spark “remembers” is by creating a DAG and then it tries and optimises the DAG to optimize the use of resources (like bandwidth, memory etc).

  • It enables Spark to optimize the required operations.
  • It enables Spark to recover from failures and slow workers.
  • Spark supports in-memory computation which enables it to be a lot faster for iterations.

37 of 42

Spark Execution

38 of 42

Cache/Persistence

  • By default, RDDs are recomputed each time you run an action on them. This can be expensive (in time) if you need to use a dataset more than once.
  • Spark allows you to control what can be cached into memory (using the .cache() or the .persist() API).
  • One of the most common performance bottlenecks arises from unknowingly re-evaluation several transformations.�This will be tested in some of the assignments.

Reference Link - https://data-flair.training/blogs/apache-spark-rdd-persistence-caching/

39 of 42

FAQs/Helpful Debugging Instructions

  • In case tests don’t pass, try running all the cells in the notebook.
  • If you see a name-not-defined error, It could be because of your code timing out on the grader.
  • Timeouts can occur due to inefficient code (for example, calling collect() on the entire dataset).
  • Local tests are not comprehensive. They are there to assist you in building your code. You only get points if the assignment runs on the autograder.

40 of 42

Appendix

41 of 42

42 of 42

Group By Key Example - PySpark