1 of 95

10-605 / 10-805�Map-Reduce and Spark�Sample Workflows

Machine Learning from Large Datasets

2 of 95

Recap 1/2

  • Administrivia History of big data for ML
    • To learn something complex you need lots and lots of data – the more the better!
    • Deep learning in general, and LLMs in particular, need big data
      • Big models are not enough
  • Working with big data means understanding how to do ML efficiently
    • Starting with standard complexity analysis and the cost of operations
    • Best case for i/o bound tasks is sequential access to data (from disk or from a network)

3 of 95

Recap 2/2

  • Cost of operations
    • read from disk sequentially
    • often data size > RAM
  • Learning as counting
    • density estimation
    • naïve Bayes
      • words are independent of each other given the class
  • Distributed processing for big data
    • embarrassingly parallel tasks
    • map-reduce
      • abstract operations: map, flatMap, filter, reduceByKey, ….
      • Spark / pyspark

4 of 95

Recap 3/2: Spark WordCount

4

5 of 95

Recap 3/2: Spark WordCount

5

6 of 95

Recap 3/2: Spark WordCount

6

  1. Tokenization and converting to messages is done in parallel over each partition of the RDD lines
  2. reduceByKey does a shuffle-sort, doing a pairwise reduce of each “run” of messages with the same “key”

7 of 95

Today’s Plan

  • Where did Map-Reduce come from?
    • wordcount with Unix tools
    • A simple implementation
  • From Map-Reduce to Spark
  • Example Workflows in Spark

7

8 of 95

MAP-REDUCE: AN ORIGIN STORY

8

How hacks to handle large datasets with small memory evolved into Hadoop, Spark, Flume, …

9 of 95

9

c. 1994

10 of 95

Some examples

tr -sc '[:alpha:]' '\n’

< data/brown_nolines.txt

| sort

| uniq -c

| sort -nr

10

translate non-letters (-c) to newline and ‘squeeze’ runs of newlines to a single one

The

Fulton

County

Grand

Jury

said

Friday

an

investigation

of

sort words

dedup and count

sort in descending (-r) numeric order

62690 the

36043 of

27952 and

25725 to

22000 a

19581 in

10328 that

9969 is

9770 was

8833 for

11 of 95

11

12 of 95

Unix poetry

  • Line-by-line processing of a file
    • tr (translate): e.g. line of text 🡪 list of words
    • awk: line of text 🡪 simple program output
      • e.g. bigram + statistics 🡪 MI score
  • Filtering
    • e.g., discard infrequent bigrams with awk
  • Sorting
    • and then counting runs of unique values

12

Line-by-line processing, filtering is memory efficient

So is sorting!

13 of 95

13

Merge sort

14 of 95

14

Merge sort

15 of 95

15

16 of 95

Unix Sort

16

  • Load as much as you can into memory (S) and do an in-memory sort [usually quicksort].

  • If you have more to do, then spill this sorted buffer out on to disk, and get more data…

  • Finally, merge your spill buffers.

17 of 95

Back to Map-Reduce…

17

map takes one input x and outputs any number of outputs

reduce takes a key (word) and an associated Iterator over associated values and outputs some aggregation of the values

18 of 95

Recap 3/2: Spark WordCount

18

19 of 95

A Bad Implementation

19

bash-3.2$ head -1 ../data/brown_nolines.txt

The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.

bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce

--src ../data/brown_nolines.txt --dst wc.txt

bash-3.2$ head -5 wc.txt

('the', 69969)

('fulton', 17)

('county', 160)

('grand', 52)

('jury', 68)

wc_nano.py

hz_nano.py

python fire –m <script> <class> <method> --arg1 val1 …

20 of 95

Constant-Memory Word Count in Map-Reduce

20

bash-3.2$ head -1 ../data/brown_nolines.txt

The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.

bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce

--src ../data/brown_nolines.txt --dst wc.txt

bash-3.2$ head -5 wc.txt

('the', 69969)

('fulton', 17)

('county', 160)

('grand', 52)

('jury', 68)

21 of 95

Constant-Memory Word Count in Map-Reduce

21

bash-3.2$ head -1 ../data/brown_nolines.txt

The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.

bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce

--src ../data/brown_nolines.txt --dst wc.txt

bash-3.2$ head -5 wc.txt

('the', 69969)

('fulton', 17)

('county', 160)

('grand', 52)

('jury', 68)

Python generators 101

Each time g.__next__() is called:

  • run g until it ‘yields’ a value
  • save the state of g

Constant memory (state of g)

pair_generator() produces

(‘0’, 1), … (‘aaa’, 1), … (‘aaron’, 1), …

ReduceReady(pair_generator()) returns

(‘aaron’, <generatoraaron>)

where <generatoraaron>) produces: 1, 1 …

22 of 95

22

or you could use itertools.group_by

23 of 95

Constant-Memory Word Count in Map-Reduce

23

bash-3.2$ head -1 ../data/brown_nolines.txt

The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.

bash-3.2$ py -m fire wc_micro.py WordCount \ map_reduce

--src ../data/brown_nolines.txt --dst wc.txt

bash-3.2$ head -5 wc.txt

('0', 108)

('00', 33)

('000', 427)

('001', 2)

('002', 1)

hz_micro

24 of 95

Parallel constant-memory wordcount

24

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Pass 1

Sort

  • ctr[w1] += 1
  • ctr[w1] += 1
  • ….

Logic to process messages

Pass 2

Sorting

“ctr[w1] +=1”

“ctr[w2] +=1”

Easy to parallelize!

Easy to parallelize

if we are careful

Also route messages to the “right” machine

25 of 95

Parallel constant-memory wordcount

25

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[x] +=d”

Pass 1: Mapper

Sort

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Pass 2: Reducer

Spill 1

Spill 2

Spill 3

Merge Spill Files

Observation: you can “reduce” in parallel (correctly) if no sort key is split across machines

Sort key

26 of 95

26

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[x] +=d”

Mapper Machine

Sort

Spill 1

Spill 2

Spill 3

  • ctr[“alice”] += d1
  • ctr[“alice”] += d2
  • ….

combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[“bob”] += d3
  • ctr[“joe”] += d4
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

Spill 4

Sort needs to partition data appropriately

(into multiple “shards”)

Parallel constant-memory wordcount

27 of 95

27

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[x] +=d”

Mapper Machine

Sort

Spill 1

Spill 2

Spill 3

  • ctr[“alice”] += d1
  • ctr[“bob”] += d2
  • ….

combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[“bob”] += d3
  • ctr[“joe”] += d4
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

Spill 4

Sort needs to partition data appropriately, i.e., by the sort key

Parallel constant-memory wordcount

28 of 95

28

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 1

Partition/Sort

Spill 1

Spill 2

Spill 3

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 2

Partition/Sort

Spill 1

Spill 2

Spill 3

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[x2] += d1
  • ctr[x2] += d2
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

Spill n

Same holds for mapper machines: you can count in parallel if no sort key is split across multiple reducer machines

Simple way to partition:

hash the sort key

29 of 95

29

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 1

Partition/Sort

Spill 1

Spill 2

Spill 3

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 2

Partition/Sort

Spill 1

Spill 2

Spill 3

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[x2] += d1
  • ctr[x2] += d2
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

Spill n

30 of 95

30

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 1

Partition/Sort

Spill 1

Spill 2

Spill 3

  • doc 1
  • doc 2
  • ….

Counting logic

Mapper Machine 2

Partition/Sort

Spill 1

Spill 2

Spill 3

Spill n

Mapper/counter machines run the “Map phase” of map-reduce

  • Input different subsets of the total inputs
  • Output (sort key,value) pairs

  • The (key,value) pairs are partitioned based on the key
  • Pairs from each partition will be sent to a different reducer machine.

31 of 95

31

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[x2] += d1
  • ctr[x2] += d2
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

The shuffle/sort phrase:

  • (key,value) pairs from each partition are sent to the right reducer
  • The reducer will sort the pairs together to get all the pairs with the same key together.

The reduce phase:

  • Each reducer will scan through the sorted key-value pairs.

32 of 95

32

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Reducer Machine 1

Merge Spill Files

  • ctr[x2] += d1
  • ctr[x2] += d2
  • ….

combine counter updates

Reducer Machine 2

Merge Spill Files

Keys for partitioning and sorting are common elements of many map-reduce frameworks.

Ideally the partitions will be similar size – but you also need to assign keys to partitions quickly.

Random hashing is often fine, but not when some keys have lots of messages and some have few.

33 of 95

Map-Reduce is Distributed �Map, Shuffle-Sort, Reduce

33

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 1

Partition/Sort

Spill 1

Spill 2

Spill 3

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 2

Partition/Sort

Spill 1

Spill 2

Spill 3

  • C[x1] += D1
  • C[x1] += D2
  • ….

Logic to combine counter updates

Reducer 1

Merge Spill Files

  • C[x2] += D1
  • C[x2] += D2
  • ….

combine counter updates

Reducer 2

Merge Spill Files

Spill n

Distributed Shuffle-Sort

34 of 95

Key ideas in distributed map reduce

34

35 of 95

Key ideas in distributed map reduce

35

36 of 95

Key ideas in distributed map reduce

36

37 of 95

Today’s Plan

  • Where did Map-Reduce come from?
    • wordcount with Unix tools
    • A simple implementation
  • From Map-Reduce to Spark
  • Example Workflows in Spark

37

38 of 95

More About Spark

38

39 of 95

Spark is MapReduce++

  • Data abstraction
    • Sharded files 🡪 Resilient Distributed Datasets
      • RDDs are immutable
      • They are partitioned / sharded
      • You access them with high level operations that can be implemented by short Map-Reduce pipelines
      • You can mark RDDs as ‘persistent’ (the data is saved for debugging, reuse, etc) and RDDs can persist on disk or in memory
        • Memory-cached RDDs use a lot of cluster memory but are very useful
    • Two types of operations on RDDs
      • Transformations – lazy, return a ‘plan’ to make some change
      • Actions – eager, usually force execution of a map-reduce pipeline, and return some Python object

39

40 of 95

Many Things Are Built On Spark

  • DB-like data processing
    • SparkSQL
    • Spark DataFrames
  • ML libraries
    • MLlib
  • Graph libraries
    • GraphX

  • We’re mostly not covering these here!

40

41 of 95

Why is Caching in Memory Important?

41

42 of 95

Spark examples: logistic regression

  • Model:

  • Learning algorithm:
    • define the “loss” of the model on the data
    • start with random w and iteratively adjust it to reduce loss
      • by gradient descent

43 of 95

Why is Caching in Memory Important?

43

44 of 95

Spark examples: batch logistic regression

44

w is defined outside the lambda function, but used inside it

So: python builds a closure – code including the current value of w – and Spark ships it off to each worker. So w is copied, and must be read-only.

45 of 95

Spark examples: batch logistic regression

45

dataset of points is cached in cluster memory to reduce i/o

46 of 95

46

  • point 1
  • point 2
  • point 3
  • ….

Gradient logic

Map Process 1

  • point 7,356,701
  • point 7,356,702
  • point 7,356,703
  • ….

Gradient logic

Map Process 2

Partition/Sort

  • d/dw point1
  • ….

add pointwise gradients

Reducer 1

  • d/dw point 7..001
  • ….

add pointwise gradients

Reducer 2

Partition/Sort

Spill 1

Spill 2

Spill 3

Spill 1

Spill 2

Spill 3

Merge Spill Files

Merge Spill Files

Spill n

Distributed Shuffle-Sort

gradient

w

47 of 95

Why is Caching in Memory Important?

  • Without caching the training data, you need to load it all the data from disk in each iteration.
  • With caching, communication/disk storage is only need for the weights.
  • ML is all about iterative optimization.

47

48 of 95

Why Transformations and Actions?

  • For large data it’s important to have an explicit representation of the “code” you want to run
    • For optimization
      • e.g. a sequence of .map() transformations
    • For distribution to workers
    • For recovery from errors
  • Spark’s transformations are like “code”
  • This happens over and over in different contexts
    • Databases have query languages and query optimizers
    • Tensorflow and PyTorch have “computation graphs”
  • Language-like packages in Python are not different

48

49 of 95

Why Transformations and Actions?

  • Actions give you a way to use Spark interactively
    • Use them when you debug
    • Avoid them as intermediate steps in a “production” pipeline
      • They are bottlenecks!

49

50 of 95

Cost of Transformations

  • Some transformations can run as a single cheap “mapper” process
    • Each partition is transformed independently, so you don’t need to move data around
    • The data keys are unchanged, so you don’t need to re-partition the data
  • Examples:
    • filter(f), sample(…), mapValues(f), …
    • map(f, preservesPartioning=True)
    • foreach(f) # only runs f, doesn’t update RDD

50

51 of 95

Cost of Transformations

  • Some transformations require an expensive shuffle-sort
    • reduceByKey, join, intersection, subtract, groupBy, sortBy,…
    • partitionByKey, repartition, ..

    • It’s not always obvious when a shuffle-sort will be needed
      • But if you understand the map-reduce layer you can make a good guess

51

52 of 95

Communication without RDDs

  • Broadcast variables can be used to share constants, config information, and small data structures to all workers.
    • Like closures in a function call, they are copied to every worker
    • Worker-driven hanges to a broadcast variable are not broadcast back
    • Helpful for implementing a “map-side” join

b = sc.broadcast(obj)

if b.value > 10:

52

53 of 95

Communication without RDDs

  • Accumulators are shared counters
    • Generally used for monitoring progress
    • You don’t know the order in which accumulators will be incremented
    • You can define your own accumulator classes
    • Workers can’t read the accumulator’s values

53

54 of 95

Communication without RDDs

  • Closures sent to workers as an argument of a transformation or action

54

55 of 95

Communication without RDDs

  • Closures sent to workers as an argument of a transformation or action
    • note: lambda’s in Python are sometimes surprising!
    • be careful and/or be explicit!

55

56 of 95

Communication without RDDs

  • Closures sent to workers as an argument of a transformation or action
    • Q: how does Spark determine what values to package up with the code that it sends to workers?

    • Good practice: don’t make this too complicated

56

57 of 95

A CONSTANT-MEMORY VERSION OF SPARK

57

58 of 95

58

59 of 95

59

60 of 95

PageRank

60

61 of 95

Googles PageRank

61

web site xxx

web site yyyy

web site a b c d e f g

web

site

pdq pdq ..

web site yyyy

web site a b c d e f g

web site xxx

Inlinks aregood(recommendations)

Inlinks from a good site are better than inlinks from abadsite

but inlinks from sites with many outlinks are not a “good...

Goodand bad are relative.

web site xxx

62 of 95

Googles PageRank

62

web site xxx

web site yyyy

web site a b c d e f g

web

site

pdq pdq ..

web site yyyy

web site a b c d e f g

web site xxx

Imagine a pagehopper that always either

  • follows a random link, or
  • jumps to random page

63 of 95

Googles PageRank�(Brin & Page, http://www-db.stanford.edu/~backrub/google.html)

63

web site xxx

web site yyyy

web site a b c d e f g

web

site

pdq pdq ..

web site yyyy

web site a b c d e f g

web site xxx

Imagine a pagehopper that always either

  • follows a random link, or
  • jumps to random page

PageRank ranks pages by the amount of time the pagehopper spends on a page:

  • or, if there were many pagehoppers, PageRank is the expected crowd size

64 of 95

PageRank in Memory

  • Let u = (1/N, …, 1/N)
    • dimension = #nodes N
  • Let A = adjacency matrix: [aij=1 ⬄ i links to j]
  • Let W = [wij = aij/outdegree(i)]
    • wij is probability of jump from i to j
  • Let v0 = (1,1,….,1)
    • or anything else you want
  • Repeat until converged:
    • Let vt+1 = cu + (1-c) vtW
      • c is probability of jumping “anywhere randomly”

64

65 of 95

PageRank in Spark

65

66 of 95

A Slow Implementation

nodes: list of string ids

outlinks_by_node: RDD of (src, [out1, .. ])

v0 = (1,1,….,1)

(1-c) vtW

vt+1 = cu + (1-c) vtW

cu

wij is probability of jump from i to j

(1-c) vtW

after sum

67 of 95

A Slow Implementation

outlinks

scores

hops

resets

scores

10 iterations on small graph locally is 10min

5 iterations is 30sec

68 of 95

A Slow Implementation

69 of 95

Fixing A Slow Implementation

70 of 95

Keeping v in memory

  • Convert values to “combiner” (accumulator type)
  • Merge value with accumulator
  • Separately merge two accumulators
  • Leads to important optimizations because combine can happen on mapper side

Explicit closure

71 of 95

COMPARING COUNTS IN TWO CORPORA

72 of 95

Classic PySpark examples: wordcount

72

73 of 95

 

Naïve Bayes:

 

 

74 of 95

75 of 95

Comparing Bigrams

Problem: many of the common bigrams don’t seem to have meaning as a phrase….

76 of 95

Scoring bigrams as phrases

explanation

word_v

#distinct words

phrase_v

#distinct phrases

word_n

#words in corpus

phrase_n

#phrases in corpus

xy

phrase

x

first word

y

second word

As a phrase “x y”

As a random co-occurrence of x and y

77 of 95

Comparing Phrases

Filtered for high “phrasiness”

78 of 95

Comparing bigrams and compute filter by phrasiness

79 of 95

The pipeline – basic counts

80 of 95

The pipeline w/o filtering

81 of 95

Filtering phrases

Joins change shape of the data

This gets awkward…

82 of 95

Filtering phrases

83 of 95

An example from Ron Bekkerman�k-means in Map-Reduce�(background)

83

84 of 95

Expectation-Maximization

84

EM is a kind of coordinate ascent:

  1. Split variables w to optimize into two sets: x,y
  2. Fix one set (x) and optimize the other (y)
  3. Then fix y and optimize x
  4. Then repeat…

85 of 95

Expectation-Maximization

85

In EM there is typically

  • instances X
  • a “soft assignment” of each xi to a “cluster”, zi
    • zi,j = Pr(xi in cluster j)
  • parameters w

Then do coordinate descent on w, Z to maximize

–log Pr(X| w, Z)

86 of 95

An example from Ron Bekkerman�k-means in Map-Reduce

86

87 of 95

Example: k-means clustering

  • An EM-like algorithm:
  • Initialize k cluster centroids
  • E-step: associate each data instance with the closest centroid
    • Find “expected values” of cluster assignments given the data and centroids
  • M-step: recalculate centroids as an average of the associated data instances
    • Find new centroids that “maximize that expectation”
  • And repeat…

87

88 of 95

k-means Clustering

88

centroids

89 of 95

Example: k-means clustering

  • An EM-like algorithm:
  • Initialize k cluster centroids
  • E-step: associate each data instance with the closest centroid
  • M-step: recalculate centroids as an average of the associated data instances

89

90 of 95

Parallelizing k-means

90

91 of 95

Parallelizing k-means

91

92 of 95

Parallelizing k-means

92

93 of 95

k-means on MapReduce

  • Mappers read data partitions + centroids
  • Mappers assign data instances to clusters
  • Mappers compute new local centroids and local cluster sizes
  • Reducers aggregate local centroids (weighted by local cluster sizes) into new global centroids
  • Reducers write the new centroids

93

94 of 95

Questions

  • Mappers read data portions and centroids
  • Mappers assign data instances to clusters
  • Mappers compute new local centroids and local cluster sizes
  • Reducers aggregate local centroids (weighted by local cluster sizes) into new global centroids
  • Reducers write the new centroids

94

  • What should be in memory and what should be in an RDD?
  • How many Shuffle-Sorts are needed for one pass?
    • Assume we can map through documents

95 of 95

Questions

  • Mappers read data portions and centroids
  • Mappers assign data instances to clusters
  • Mappers compute new local centroids and local cluster sizes
  • Reducers aggregate local centroids (weighted by local cluster sizes) into new global centroids
  • Reducers write the new centroids

95

Sketch:

  • Map only: With all centroids in mapper memory, assign each doc to a centroid
    • outputting docid 🡪 centroid index
  • (Shuffle-sort: Count number of docs per centroid)
  • Map-Reduce: join docs to centroids
  • Map-Reduce: average doc weights for each centroid