10-605 / 10-805�Map-Reduce and Spark
Machine Learning from Large Datasets
Monday’s recap
Today’s plan
COST OF OPERATIONS
What computers act like
Store | Size | | Typical Access Time | |
L1 Cache | 128 Kb | | 1ns | |
L2 Cache | 16 Mb | 100x | 4ns | 4x |
Main memory | 8Gb (up to 192Gb) | 500x | 100ns - reference | 25x |
| Read 1Mb sequentially | 1000ns | 10x | |
Disk | 320Gb | 40x | | |
| Read 1Mb sequentially | 500,000ns | 500x | |
| Seek | 2,000,000ns | 4x | |
small and fast
big and slow
What computers act like
Store | Size | | Typical Access Time | |
L1 Cache | 128 Kb | | 1ns | |
L2 Cache | 16 Mb | 100x | 4ns | 4x |
Main memory | 8Gb (up to 192Gb) | 500x | 100ns - reference | 25x |
| Read 1Mb sequentially | 1000ns | 10x | |
SSD | Read 1Mb sequentially | 20,000ns | 20x | |
| Read 4k random-access | (similar) | | |
Disk | 320Gb | 40x | | |
| Read 1Mb sequentially | 500,000ns | 25x | |
Network | Send 1Mb in same datacenter | (similar) | ~ 1x | |
Disk | Seek | 2, 000,000ns | 4x | |
Visualize the cost of operations
Visualize the cost of operations
But memory gets cheaper over time, right?
Yes but
Visualize the cost of operations
Summary: What do we count?
Today’s plan
DENSITY ESTIMATES AND NAÏVE BAYES
The Joint Density
The Joint Can Be Used To Implement A Classifier
14
The joint density:
A classifier for C based on the joint density estimator:
and predict
A “brute force” joint density estimate is a big table based on max likelihood estimates (MLEs)
or just
An experiment: how useful is that classifier?
5% error
Of 723 examples:
Q: Is that good or bad performance?
An experiment: how useful is that classifier?
Pattern | Used | Errors |
P(C|A,B,D,E) | 101 | 1 |
P(C|A,B,D) | 157 | 6 |
P(C|B,D) | 163 | 13 |
P(C|B) | 244 | 78 |
P(C) | 58 | 31 |
17.8% error overall
A “naïve” estimate of joint probability
17
The (brute force) joint density estimator uses MLEs:
The naïve estimator assumes independence:
A “naïve” but useful estimate of joint probability
18
If you want to use this to predict C you need can’t assume everything is independent of C….so….
and predict
or just
Naïve Bayes refinement - 1
19
In n-grams we look at Pr(b)=Pr(second word is “b”), etc.
For long documents you usually don’t keep track of where a word appears, so:
This is a really simple classifier: you just need to count (co-occurrences of words with classes, and number of examples in each class)
Naïve Bayes refinement - 2
20
For numeric stability it’s best to compute with logs:
Naïve Bayes refinements
21
You also usually smooth the estimates, e.g.:
where V is the number of words.
This is usually a terrible density estimator but often is a pretty good classifier!
or
Classification vs Density Estimation
Classification
Density Estimation
Classification vs density estimation
Today’s plan
DISTRIBUTED PROCESSING FOR LARGE FILES
25
Linear-time Tasks
>>> map(lambda x:x*2, range(0, 20, 2))
[0, 4, 8, 12, 16, 20, 24, 28, 32, 36]
>>> filter(lambda x: x % 8 == 0, range(0, 20, 2))
[0, 8, 16]
26
Embarassingly Parallel Tasks
27
…
Embarrassingly parallel (or not) ?
28
Similarity and duplicate detection
Compute Scaling Laws = Data Scaling Laws
Llama 1 (2023) training mixture
Another nearly-linear operation
31
Map-reduce is more powerful
32
Embarrassingly parallel plus one additional operation: shuffle-sort
Small-vocabulary word count: high level view
33
Counting logic
“ctr[w1] +=1”
Stream thru docs
“ctr[w2] +=1”
Counter Data Structure
(in memory)
“Messages”
…
Aside: we’re moving to a parallel setting where “messages” makes more sense.
Small-vocabulary word count: high level view
34
Counting logic
“ctr[w1] +=1”
Stream thru docs
“ctr[w2] +=1”
Counter Data Structure
(on disk)
“Messages”
…
Disk seeks will make this very slow!!
Large-vocabulary word count: distributed counters
35
Counting logic
“ctr[w1] +=1”
Streaming machine
“ctr[w2] +=1”
Counter Data Structure 1
(in memory 1)
“Messages”
…
Counter Data Structure P
(in memory P)
Message Router
…
Machines 1,…,P
Workable but expensive
Large-vocabulary word count with sorting
36
Counting logic
Pass 1
Sort
Logic to process messages
Pass 2
Sorting
“ctr[w1] +=1”
“ctr[w2] +=1”
…
It’s easy to process messages without a big memory – because all the messages for a single word have been put together
Map-Reduce is Really Map, Shuffle-Sort, Reduce
37
Counting logic
Map Process 1
Partition/Sort
tmp 1
tmp 2
tmp 3
…
Counting logic
Map Process 2
Partition/Sort
tmp 1
tmp 2
tmp 3
…
combine counter updates
Reducer 1
Merge Tmp Files
combine counter updates
Reducer 2
Merge Tmp Files
tmp n
Distributed Shuffle-Sort
Partitioning Messages for Word Count
38
Logic to combine counter updates
Reducer Machine 1
combine counter updates
Reducer Machine 2
The shuffle/sort phrase:
The reduce phase:
MAP-REDUCE�FRAMEWORKS
43
Frameworks
44
Frameworks
What else is really needed in practice?
45
Frameworks
What is really needed in practice?
46
Frameworks
More comments on debugging:
47
ABSTRACTIONS FOR �MAP-REDUCE�
48
Abstractions On Top Of Map-Reduce
49
Example: stem words in a stream of word-count pairs:
(“aardvarks”,1) 🡺 (“aardvark”,1)
Proposed syntax:
table2 = MAP table1 TO λ row : f(row))
f(row)🡺row’
Example: apply stop words
(“aardvark”,1) 🡺 (“aardvark”,1)
(“the”,1) 🡺 deleted
Proposed syntax:
table2 = FILTER table1 BY λ row : f(row))
f(row)🡺 {true,false}
Abstractions On Top Of Map-Reduce
50
Example: tokenizing a line
“I found an aardvark” 🡺 [“i”, “found”,”an”,”aardvark”]
“We love zymurgy” 🡺 [“we”,”love”,”zymurgy”]
..but final table is one word per row
“i”
“found”
“an”
“aardvark”
“we”
“love”
…
Proposed syntax:
table2 = FLATMAP table1 TO λ row : f(row))
f(row)🡺list of rows
This can be done with one Mapper, since a Map process can send any number of messages downstream
flashback: Naïve Bayes
51
Predicted y for doc x1,…,xd
where
naïve Bayes: testing
X=w1 Y=sports
X=w1 Y=news
X=..
X=w2 Y=…
X=…
…
5245
1054
2120
37
3
…
keys
w | Counts associated with w |
aardvark | n[w^Y=sports]=2 |
agent | n[w^Y=sports]=1027, n[w^Y=news]=564 |
… | … |
zynga | n[w^Y=sports]=21, n[w^Y=news]=4464 |
n
Count #times each (word,label) pair appears
naïve Bayes: testing
The general case:
We’re taking rows from a table
Applying a function to get a new value
And grouping the rows of the table by this new value
Proposed syntax:
GROUP table BY λ row : f(row)
Could define f via: a function, a field of a defined record structure, …
f(row)🡺field
How to implement this with map-reduce?
w | Counters |
aardvark | n[w^Y=sports]=2 |
agent | n[w^Y=sports]=… |
agent | n[w^Y=sports]=… |
agent | n[w^Y=sports]=… |
… | n[w^Y=sports]=… |
zynga | n[…] |
zynga | n[…] |
w | DocId |
found | doc1 |
aardvark | doc1 |
… | |
zymurgy | doc2 |
… | .. |
Proposed syntax:
JOIN table1 BY λ row : f(row),
table2 BY λ row : g(row)
Doc1: “I found an aardvark”
Doc2: “We love zymurgy”
…
How to implement this with map-reduce? Similar to group-by
Spark: A High-Level Dataflow Language
55
Spark
56
Set of concise dataflow operations (“transformation”)
Dataflow operations are embedded in an API together with “actions”
Spark
57
Spark examples
58
spark is a spark context object – sometimes sc
Spark examples
59
errors is a transformation, and thus a data strucure that explains HOW to do something
count() is an action: it will actually execute the plan for errors and return a value.
errors.filter() is a transformation
collect() is an action
everything is sharded (i.e., partitioned)
Spark examples
60
# modify errors to be stored in cluster memory
subsequent actions will be much faster
everything is sharded … and the shards are stored in memory of worker machines not local disk (if possible, and if you ask)
You can also persist() an RDD on disk.
Spark examples
61
# modify errors to be stored in cluster memory
everything is sharded … and the shards are stored in memory of worker machines not local disk (if possible)
You can also persist() an RDD on disk, which means it is stored somewhere for debugging purposes (or re-use).
persist-on-disk works because the RDD is read-only (immutable)
Spark examples: wordcount
62
the action
transformation on (key,value) pairs , which are special
Recap: Distributed Stream-and-Sort: �Map, Shuffle-Sort, Reduce
63
Counting logic
Map Process 1
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Counting logic
Map Process 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Logic to combine counter updates
Reducer 1
Merge Spill Files
combine counter updates
Reducer 2
Merge Spill Files
Spill n
Distributed Shuffle-Sort
Classic PySpark examples: wordcount
64
Spark examples: wordcount
65
Spark examples: wordcount
66
Spark examples: logistic regression
Classic Spark examples: batch logistic regression
68
reduce is an action – it produces a numpy vector
p.x and w are vectors, from the numpy package
p.x and w are vectors, from the numpy package. Python overloads operations like * and + for vectors.
Spark examples: batch logistic regression
69
Important note: numpy vectors/matrices are not just “syntactic sugar”.
Spark examples: batch logistic regression
70
w is defined outside the lambda function, but used inside it
So: python builds a closure – code including the current value of w – and Spark ships it off to each worker. So w is copied, and must be read-only.
Spark examples: batch logistic regression
71
dataset of points is cached in cluster memory to reduce i/o
Spark examples: batch logistic regression
72
gradient computation is done in parallel, locally on each worker, using data kept in memory.
summing the gradient vectors is done in parallel, and intermediate sums are set across the network
updating the weights is done on the local machine
The lambda function, including the current value of w, is broadcast to the workers
Spark logistic regression example
73