10-605 / 10-805�Map-Reduce and Spark�Sample Workflows
Machine Learning from Large Datasets
Recap 1/2
Recap 2/2
Recap 3/2: Spark WordCount
4
Recap 3/2: Spark WordCount
5
Recap 3/2: Spark WordCount
6
Today’s Plan
7
MAP-REDUCE: AN ORIGIN STORY
8
How hacks to handle large datasets with small memory evolved into Hadoop, Spark, Flume, …
9
c. 1994
Some examples
tr -sc '[:alpha:]' '\n’
< data/brown_nolines.txt
| sort
| uniq -c
| sort -nr
10
translate non-letters (-c) to newline and ‘squeeze’ runs of newlines to a single one
The
Fulton
County
Grand
Jury
said
Friday
an
investigation
of
…
sort words
dedup and count
sort in descending (-r) numeric order
62690 the
36043 of
27952 and
25725 to
22000 a
19581 in
10328 that
9969 is
9770 was
8833 for
11
Unix poetry
12
Line-by-line processing, filtering is memory efficient
So is sorting!
13
Merge sort
14
Merge sort
15
Unix Sort
16
Back to Map-Reduce…
17
map takes one input x and outputs any number of outputs
reduce takes a key (word) and an associated Iterator over associated values and outputs some aggregation of the values
Recap 3/2: Spark WordCount
18
A Bad Implementation
19
bash-3.2$ head -1 ../data/brown_nolines.txt
The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.
bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce
--src ../data/brown_nolines.txt --dst wc.txt
bash-3.2$ head -5 wc.txt
('the', 69969)
('fulton', 17)
('county', 160)
('grand', 52)
('jury', 68)
wc_nano.py
hz_nano.py
python fire –m <script> <class> <method> --arg1 val1 …
Constant-Memory Word Count in Map-Reduce
20
bash-3.2$ head -1 ../data/brown_nolines.txt
The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.
bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce
--src ../data/brown_nolines.txt --dst wc.txt
bash-3.2$ head -5 wc.txt
('the', 69969)
('fulton', 17)
('county', 160)
('grand', 52)
('jury', 68)
Constant-Memory Word Count in Map-Reduce
21
bash-3.2$ head -1 ../data/brown_nolines.txt
The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.
bash-3.2$ py -m fire wc_nano.py WordCount \ map_reduce
--src ../data/brown_nolines.txt --dst wc.txt
bash-3.2$ head -5 wc.txt
('the', 69969)
('fulton', 17)
('county', 160)
('grand', 52)
('jury', 68)
Python generators 101
Each time g.__next__() is called:
Constant memory (state of g)
pair_generator() produces
(‘0’, 1), … (‘aaa’, 1), … (‘aaron’, 1), …
ReduceReady(pair_generator()) returns
(‘aaron’, <generatoraaron>)
where <generatoraaron>) produces: 1, 1 …
22
or you could use itertools.group_by
Constant-Memory Word Count in Map-Reduce
23
bash-3.2$ head -1 ../data/brown_nolines.txt
The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced "no evidence" that any irregularities took place.
bash-3.2$ py -m fire wc_micro.py WordCount \ map_reduce
--src ../data/brown_nolines.txt --dst wc.txt
bash-3.2$ head -5 wc.txt
('0', 108)
('00', 33)
('000', 427)
('001', 2)
('002', 1)
hz_micro
Parallel constant-memory wordcount
24
Counting logic
Pass 1
Sort
Logic to process messages
Pass 2
Sorting
“ctr[w1] +=1”
“ctr[w2] +=1”
…
Easy to parallelize!
Easy to parallelize
if we are careful
Also route messages to the “right” machine
Parallel constant-memory wordcount
25
Counting logic
“ctr[x] +=d”
Pass 1: Mapper
Sort
Logic to combine counter updates
Pass 2: Reducer
Spill 1
Spill 2
Spill 3
…
Merge Spill Files
Observation: you can “reduce” in parallel (correctly) if no sort key is split across machines
Sort key
26
Counting logic
“ctr[x] +=d”
Mapper Machine
Sort
Spill 1
Spill 2
Spill 3
…
combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
Spill 4
Sort needs to partition data appropriately
(into multiple “shards”)
Parallel constant-memory wordcount
27
Counting logic
“ctr[x] +=d”
Mapper Machine
Sort
Spill 1
Spill 2
Spill 3
…
combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
Spill 4
Sort needs to partition data appropriately, i.e., by the sort key
Parallel constant-memory wordcount
28
Counting logic
Mapper Machine 1
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Counting logic
Mapper Machine 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Logic to combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
Spill n
Same holds for mapper machines: you can count in parallel if no sort key is split across multiple reducer machines
Simple way to partition:
hash the sort key
29
Counting logic
Mapper Machine 1
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Counting logic
Mapper Machine 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Logic to combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
Spill n
30
Counting logic
Mapper Machine 1
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Counting logic
Mapper Machine 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Spill n
Mapper/counter machines run the “Map phase” of map-reduce
31
Logic to combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
The shuffle/sort phrase:
The reduce phase:
32
Logic to combine counter updates
Reducer Machine 1
Merge Spill Files
combine counter updates
Reducer Machine 2
Merge Spill Files
Keys for partitioning and sorting are common elements of many map-reduce frameworks.
Ideally the partitions will be similar size – but you also need to assign keys to partitions quickly.
Random hashing is often fine, but not when some keys have lots of messages and some have few.
Map-Reduce is Distributed �Map, Shuffle-Sort, Reduce
33
Counting logic
Map Process 1
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Counting logic
Map Process 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Logic to combine counter updates
Reducer 1
Merge Spill Files
combine counter updates
Reducer 2
Merge Spill Files
Spill n
Distributed Shuffle-Sort
Key ideas in distributed map reduce
34
Key ideas in distributed map reduce
35
Key ideas in distributed map reduce
36
Today’s Plan
37
More About Spark
38
Spark is MapReduce++
39
Many Things Are Built On Spark
40
Why is Caching in Memory Important?
41
Spark examples: logistic regression
Why is Caching in Memory Important?
43
Spark examples: batch logistic regression
44
w is defined outside the lambda function, but used inside it
So: python builds a closure – code including the current value of w – and Spark ships it off to each worker. So w is copied, and must be read-only.
Spark examples: batch logistic regression
45
dataset of points is cached in cluster memory to reduce i/o
46
Gradient logic
Map Process 1
Gradient logic
Map Process 2
Partition/Sort
add pointwise gradients
Reducer 1
add pointwise gradients
Reducer 2
Partition/Sort
Spill 1
Spill 2
Spill 3
…
Spill 1
Spill 2
Spill 3
…
Merge Spill Files
Merge Spill Files
Spill n
Distributed Shuffle-Sort
gradient
w
Why is Caching in Memory Important?
47
Why Transformations and Actions?
48
Why Transformations and Actions?
49
Cost of Transformations
50
Cost of Transformations
51
Communication without RDDs
b = sc.broadcast(obj)
if b.value > 10:
…
52
Communication without RDDs
53
Communication without RDDs
54
Communication without RDDs
55
Communication without RDDs
56
A CONSTANT-MEMORY VERSION OF SPARK
57
58
59
PageRank
60
Google’s PageRank
61
web site xxx
web site yyyy
web site a b c d e f g
web
site
pdq pdq ..
web site yyyy
web site a b c d e f g
web site xxx
Inlinks are “good” (recommendations)
Inlinks from a “good” site are better than inlinks from a “bad” site
but inlinks from sites with many outlinks are not a “good”...
“Good “ and “bad” are relative.
web site xxx
Google’s PageRank
62
web site xxx
web site yyyy
web site a b c d e f g
web
site
pdq pdq ..
web site yyyy
web site a b c d e f g
web site xxx
Imagine a “pagehopper” that always either
Google’s PageRank�(Brin & Page, http://www-db.stanford.edu/~backrub/google.html)
63
web site xxx
web site yyyy
web site a b c d e f g
web
site
pdq pdq ..
web site yyyy
web site a b c d e f g
web site xxx
Imagine a “pagehopper” that always either
PageRank ranks pages by the amount of time the pagehopper spends on a page:
PageRank in Memory
64
PageRank in Spark
65
A Slow Implementation
nodes: list of string ids
outlinks_by_node: RDD of (src, [out1, .. ])
v0 = (1,1,….,1)
(1-c) vtW
vt+1 = cu + (1-c) vtW
cu
wij is probability of jump from i to j
(1-c) vtW
after sum
A Slow Implementation
outlinks
scores
hops
resets
scores
…
10 iterations on small graph locally is 10min
5 iterations is 30sec
A Slow Implementation
Fixing A Slow Implementation
Keeping v in memory
Explicit closure
COMPARING COUNTS IN TWO CORPORA
Classic PySpark examples: wordcount
72
Naïve Bayes:
Comparing Bigrams
Problem: many of the common bigrams don’t seem to have meaning as a phrase….
Scoring bigrams as phrases
| explanation |
word_v | #distinct words |
phrase_v | #distinct phrases |
word_n | #words in corpus |
phrase_n | #phrases in corpus |
xy | phrase |
x | first word |
y | second word |
As a phrase “x y”
As a random co-occurrence of x and y
Comparing Phrases
Filtered for high “phrasiness”
Comparing bigrams and compute filter by phrasiness
The pipeline – basic counts
The pipeline w/o filtering
Filtering phrases
Joins change shape of the data
This gets awkward…
Filtering phrases
An example from Ron Bekkerman�k-means in Map-Reduce�(background)
83
Expectation-Maximization
84
EM is a kind of coordinate ascent:
Expectation-Maximization
85
In EM there is typically
Then do coordinate descent on w, Z to maximize
–log Pr(X| w, Z)
An example from Ron Bekkerman�k-means in Map-Reduce
86
Example: k-means clustering
87
k-means Clustering
88
centroids
Example: k-means clustering
89
Parallelizing k-means
90
Parallelizing k-means
91
Parallelizing k-means
92
k-means on MapReduce
93
Questions
94
Questions
95
Sketch: