1 of 52

Machine Learning with Large Datasets (Fall 2025)

Recitation 1

August 29th, 2025

Special thanks to Tian Li and Prof. Heather Miller who developed some of the material covered today.

2 of 52

Agenda

1. Brief Introduction to Google Colab

2. Spark Basics

  • Spark Architecture
  • Execution of Spark Program
  • Spark APIs
  • Lambda Functions
  • Examples
  • Cache/Persistence

3. Programming Demo

3 of 52

Homework Setup

Programming Assignments

  • 5 programming assignments - 3 using PySpark, 2 using PyTorch (Subject to Change)
  • PySpark assignments can be completed on Google Colab.
  • More details about the PyTorch assignments will be shared later.
  • For autograding, submit assignments on Gradescope.
  • Detailed instructions on how to submit assignment for grading will be in the writeup.

Written Assignments

  • Need to be directly submitted to Gradescope.

More info on Course Website - https://10605.github.io/

4 of 52

To-Dos for students

  • Register for a free community version of Databricks.
  • Download the assignment and import the Jupyter Notebooks on Databricks.
  • Configure the environment according to the instructions in the writeup.
    • Creating a cluster.
    • Installing a third-party package.
  • After all local tests pass, hand-in the assignment on Gradescope for grading.
    • Please note that you get points if the tests on the grader pass.
    • The local tests exists to help you develop and test your code.

5 of 52

To-Dos for students

  • Download the assignment and import the Jupyter Notebooks on Google Colab.
  • Configure the environment according to the instructions in the writeup.
  • After all local tests pass, hand-in the assignment on Gradescope for grading.
    • Please note that you get points if the tests on the grader pass.
    • The local tests exists to help you develop and test your code.

6 of 52

Registration and Uploading Notebooks

  1. Go to https://colab.research.google.com and login with your Google account.
  2. Click: Upload > Browse. Then upload the notebook on your local computer, which was downloaded from the course website or Piazza.

7 of 52

Exporting the Homework

  • After completion of assignment, export the notebook as an iPython Notebook (.ipynb file) and submit to Gradescope for grading.
  • Click: File > Download > Download .ipynb

8 of 52

Registration

9 of 52

Registration

Please only choose the community edition and NOT a cloud provider.

10 of 52

Login

Login to the community edition at

https://community.cloud.databricks.com/login.html

11 of 52

Creating a Cluster

12 of 52

Creating a Cluster

13 of 52

Installing Required Packages

14 of 52

Uploading/Importing the Homework

15 of 52

Interacting with Notebooks

  • Attach notebook to the cluster������

16 of 52

Exporting the Homework

  • After completion of assignment, export the notebook as an iPython Notebook and submit to Gradescope for grading.

17 of 52

Important notes about clusters

  • Please make sure that the following versions are used:
    • Runtime: 14.3 LTS
    • Spark: 3.5.0
    • Scala: 2.12
  • For all coding, please use Python3 syntax.
  • Launching a cluster can take some time (a few minutes).
  • The cluster status should be “active” before running any cells.
  • Community edition only allows for a single node cluster which should be sufficient for completion of the homeworks.
  • Community edition clusters will automatically terminate after an idle period of two hours.
  • When you close out of Databricks, the cluster terminates and you have to make a new one.

18 of 52

19 of 52

PySpark

  • We are using the Python programming interface to Spark (pySpark)
  • Where can I run PySpark code?
    • Locally on your laptop!
    • Google Colab
    • Zeppelin Notebooks
    • many more …

20 of 52

PySpark (local installation)

  • To get HWs running locally, you’ll need to install:
    • VSCode
      • Jupyter notebook extension
    • Python3
      • pip install ipykernel
      • pip install pyspark==3.5.0
      • pip install findspark
    • Java8
  • Note: displaying math equations might be buggy
  • We won’t be able to support / debug local installations

21 of 52

Spark Architecture

  • Driver program: This is the node you’re interacting with when you’re writing Spark programs. Creates Spark Context to establish the connection to Spark Execution Environment

  • Cluster Manager: Allocates resources across cluster, manages scheduling. e.g., YARN/Mesos�
  • Worker program: The program that runs on the worker nodes.�
  • Executor: What actually does the work/tasks on each cluster node.

22 of 52

Execution of a Spark Program

  • The driver program runs the Spark application, which creates a SparkContext upon start-up.
  • The SparkContext connects to a cluster manager (e.g.,Mesos/YARN) which allocates resources.
  • Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
  • Next, driver program sends your application code to these executors.
  • Finally, SparkContext sends tasks for the executors to run.

23 of 52

Spark APIs

  1. RDD (Resilient Distributed Datasets): Think distributed list.
  2. DataFrames: SQL-like structured datasets with query operations.
  3. Datasets: A mixture of RDDs and DataFrames in terms of the operations that are available on them.

  • The homeworks in this class will focus on using the RDD and the Dataframe APIs.
  • The dataframe API has lots of parallels from Pandas Dataframes.

24 of 52

RDDs vs Dataframes

RDDs

  • RDDs are immutable distributed collection of elements of your data that can be stored in memory or disk across a cluster of machines.
  • The data is partitioned across machines in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.
  • RDDs are fault tolerant as they track data lineage information to rebuild lost data automatically on failure.

DataFrames

  • Like an RDD, a DataFrame is an immutable distributed collection of data.
  • Unlike an RDD, data is organized into named columns, like a table in a relational database.
  • Dataframes allow for a higher-level abstraction by imposing a structure onto a distributed collection of data.

25 of 52

Transformations vs Actions

  • Transformations: Return new RDDs as a result.
    • They are lazy, their result RDD is not immediately computed.��

  • Actions: Compute a result based on an RDD, and either returned or saved to an external storage system (e.g., HDFS)
    • They are eager, their result is immediately computed.

26 of 52

Example: Map vs. Reduce

27 of 52

Narrow vs Wide Transformations

  • Each partition of the parentRDD is used by at most one partition of the childRDD.
  • Fast, No Data Shuffling

  • Multiple childRDD partitions may depend on a single parent RDD partition.
  • Slow, Data Shuffling Required

28 of 52

Narrow vs Wide Transformations

29 of 52

How can we create an RDD?

  • From a SparkContext (or SparkSession)

  • Referencing to the external data file stored

  • Transforming an existing RDD

What can you do when you have an RDD?

A small cheat-sheet can be found here with some nice examples.

30 of 52

Interacting with RDDs

31 of 52

Lambda vs Regular Functions

Lambda Functions

  • Evaluates only a single expression
  • No name associated

Regular Function

  • Can have multiple expressions
  • Must have name associated

32 of 52

Spark Execution

  • Spark uses lazy evaluation!
    • Lazy evaluation means nothing executes. Spark saves recipe for transforming source.
    • Results are not computed right away – instead, Spark remembers set of transformations applied to base data set.
    • The way Spark “remembers” is by creating a DAG and then it tries and optimises the DAG to optimize the use of resources (like bandwidth, memory etc).

  • It enables Spark to optimize the required operations.
  • It enables Spark to recover from failures and slow workers.
  • Spark supports in-memory computation which enables it to be a lot faster for iterations.

33 of 52

Spark Execution

34 of 52

Cache/Persistence

  • By default, RDDs are recomputed each time you run an action on them. This can be expensive (in time) if you need to use a dataset more than once.
  • Spark allows you to control what can be cached into memory (using the .cache() or the .persist() API).
  • One of the most common performance bottlenecks arises from unknowingly re-evaluation several transformations.�This will be tested in some of the assignments.

Reference Link - https://data-flair.training/blogs/apache-spark-rdd-persistence-caching/

35 of 52

Example

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

words_filter = peopleRdd.filter(lambda x: ‘i’ in x)

filtered = words_filter.collect()

What does the following code snippet do?

36 of 52

Example

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

words_filter = peopleRdd.filter(lambda x: ‘i’ in x)

filtered = words_filter.collect()

What does the following code snippet do?

creates the RDD.

nothing, since this is a transformation.

the DAG is executed since an action is called.

37 of 52

Example #2

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

peopleRdd.foreach(print)

peopleRdd.take(2)

What does the following code snippet do on the driver node?

38 of 52

Example #2

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

peopleRdd.foreach(print)

peopleRdd.take(2)

On the driver: Nothing. Why?

foreach is an action that doesn't return anything. Therefore, it is eagerly executed on the executors, not the driver. Therefore, any calls to print are happening on the stdout of the worker nodes and are thus not visible in the stdout of the driver node.

What about when take is called? Where will the array of people end up?

When an action returns a result, it returns it to the driver node.

What does the following code snippet do on the driver node?

39 of 52

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

40 of 52

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action that doesn't return anything.

Therefore, the type of the returned is NoneType.

41 of 52

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action that doesn't return anything.

Therefore, the type of the returned is NoneType.

<class ‘pyspark.rdd.PipelinedRDD’>

[None, None]

map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the output will be [None, None].

42 of 52

Example #3

peopleRdd = sc.parallelize ([“bob”, “alice”, “bill”])

newRDD = peopleRDD.foreach(print)

print(type(newRDD))

newRDD = peopleRDD.map(print)

print(type(newRDD))

newRDD.take(2)

newRDD = peopleRDD.map(lambda x: "Dr." + x)

newRDD.take(2)

What does the following code snippet do on the driver node?

<class ‘NoneType’>

foreach is an action that doesn't return anything.

Therefore, the type of the returned is NoneType.

<class ‘pyspark.rdd.PipelinedRDD’>

[None, None]

map is a transformation, and the type of the returned is another RDD where the values are modified. However, since the print() function doesn’t return anything, the output will be [None, None].

[‘Dr. bob’, ‘Dr. alice’]

map is a transformation, so it returned a new RDD with “Dr.” added before each name.

43 of 52

Example #4

data = ["Project Gutenberg's", "Alice's Adventures in Wonderland","Project Gutenberg's"]

rdd = sc.parallelize(data)

rdd_flattened = rdd.flatMap(lambda x: x.split(" "))

result = rdd_flattened.collect()

What does the following code snippet output?

44 of 52

Example #4

data = ["Project Gutenberg's", "Alice's Adventures in Wonderland","Project Gutenberg's"]

rdd = sc.parallelize(data)

rdd_flattened = rdd.flatMap(lambda x: x.split(" "))

result = rdd_flattened.collect()

# result = [Project, Gutenberg's, Alice's, Adventures, in, Wonderland, Project, Gutenberg's]

What does the following code snippet output?

45 of 52

Example #5

word_pairs = [

("apple", 1), ("banana", 1),("apple", 1),("apple", 1), ("banana", 1)

]

rdd = sc.parallelize(word_pairs)

reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)

What does reduced_rdd look like?

46 of 52

Example #5

word_pairs = [

("apple", 1), ("banana", 1),("apple", 1),("apple", 1), ("banana", 1)

]

rdd = sc.parallelize(word_pairs)

reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)

# Result: [("apple", 3), ("banana", 2)]

47 of 52

Example #5

word_pairs = [

("apple", 1), ("banana", 1),("apple", 1),("apple", 1), ("banana", 1)

]

rdd = sc.parallelize(word_pairs)

grouped_rdd = rdd.groupByKey()

result = grouped_rdd.mapValues(list)

What happens here?

48 of 52

Example #5

word_pairs = [

("apple", 1), ("banana", 1),("apple", 1),("apple", 1), ("banana", 1)

]

rdd = sc.parallelize(word_pairs)

grouped_rdd = rdd.groupByKey()

result = grouped_rdd.mapValues(list)

# Result: [("apple", [1, 1, 1]), ("banana", [1, 1])]

  • use groupbykey when you need to preserve individual values, but just group them together.
    • this is slower as data needs to be shuffled
  • use reducebykey to compute aggregated values like sum, count etc.
    • first computation is done locally, then across machines, making it more efficient.

49 of 52

FAQs/Helpful Debugging Instructions

  • In case tests don’t pass, try running all the cells in the notebook.
  • If you see a name-not-defined error, It could be because of your code timing out on the grader.
  • Timeouts can occur due to inefficient code (for example, calling collect() on the entire dataset).
  • Local tests are not comprehensive. They are there to assist you in building your code. You only get points if the assignment runs on the autograder.

50 of 52

Appendix

51 of 52

52 of 52

Group By Key Example - PySpark