1
CS 61C, Fall 2024 @ UC Berkeley
Slides credit: Dan Garcia, Justin Yokota, Peyrin Kao
Slides template credit: Josh Hug, Lisa Yan
Distributed Computing and MapReduce
Lecture 32
Scope Changes
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Scope Changes for Fall 2024 CS 61C
A brief history:
Changes for this semester:
Exam scope for Fall 2024:
Coordination Game Demo
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
See recording for the live demo.
No slides in this section.
Coordination Game Debrief
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Slides will be added after lecture.
Coordination Game Debrief
Communicating – didn't work.
Lag time is a killer – when you say something, there's a delay before others hear it.
Also, not everyone hears what you say.
Multiple people trying to do the same thing.
Some people are trying to sabotage.
Not everyone is following the same plan.
Someone initially volunteered to be the boss.
Distributed Computing: Definition
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Review: Types of Parallelism So Far
SIMD (Single Instruction, Multiple Data):
Use one instruction to operate on multiple pieces of data at the same time.
1023 2133 9728 3210
0008 0002 0001 0005
+
1031 2135 9729 3215
Use a single addition operation to add four pairs of numbers.
Review: Types of Parallelism So Far
Thread-Level Parallelism (TLP):
One program runs multiple threads simultaneously.
Fork
Join
Fork
Join
Today: Distributed Computing
Today's new type of parallelism: Distributed computing.
Analogy:
To scale up distributed programming: Just add more computers.
Distributed Computing: Challenges
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Challenges of Distributed Computing
Concurrency is hard.
Handling failure is hard.
Challenges of Distributed Computing
Communication is hard.
Goal: Split problem into independent sub-tasks, and minimize communication between programs.
Example: Manager-Worker Framework
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Manager-Worker Framework: Motivation
Suppose we have 20 independent tasks, and 4 programs.
Naive approach: Give 5 tasks to each program.
Problem: Might not load-balance well.
Need some way to dynamically assign work, while minimizing communication.
Program 0:
Program 1:
Program 2:
Program 3:
Manager-Worker Framework: Manager
Program 0 is the lone manager, whose job is assigning work to the other processes.
Manager pseudocode:
1. Setup.
2. While there's still work to do:
3. Repeat once per worker:
4. Teardown.
Manager-Worker Framework: Worker
All other programs are workers, who receive work from the manager.
Worker pseudocode:
1. Setup.
2. done=False.
3. While done==False:
4. Teardown.
Manager-Worker Framework: Design Considerations
Transmitting messages can be slow/expensive, so messages should be short.
Manager is "wasted" not doing any work, but that's a good idea.
Manager-Worker Framework: Design Considerations
Assumption so far: Tasks are independent, and can be done in any order.
What if tasks have dependencies?
Need to write two versions of code: manager code, and worker code.
Example: MapReduce
Lecture 32, CS 61C, Fall 2024
Scope Changes
Coordination Game
Distributed Computing
Examples
Not in exam scope in Fall 2024.
What is MapReduce?
Simple data-parallel programming model designed for scalability and fault-tolerance.
Pioneered by Google.
Open-source Hadoop project.
What is MapReduce Used For?
At Google:
At Yahoo!:
At Facebook:
Example: Facebook Lexicon
Tracking usage of the term "hangover" across time would require reading tons of Facebook posts and collecting word counts.
MapReduce Design Goals
Scalability to large data volumes:
Cost-efficiency:
Jeffrey Dean and Sanjay Ghemawat, "MapReduce: Simplified Data Processing on Large Clusters," 6th USENIX Symposium on Operating Systems Design and Implementation, 2004.
Typical Hadoop Cluster
40 nodes/rack, 1000-4000 nodes in cluster.
1 Gbps bandwidth within rack, 8 Gbps out of rack.
Node specs (Yahoo terasort): 8 x 2GHz cores, 8 GB RAM, 4 disks (= 4 TB?)
MapReduce in CS 10 and CS 61A
MapReduce consists of two steps: Map, then Reduce.
The MapReduce paradigm can be implemented in several different languages:
> (reduce +
(map square '(1 20 3 10))
510
Snap! (CS 10 programming language)
Scheme (CS 61A programming language)
MapReduce in CS 10 and CS 61A
MapReduce consists of two steps: Map, then Reduce.
The MapReduce paradigm can be implemented in several different languages:
Python (CS 61A programming language)
>>> from functools import reduce
>>> def plus(x,y): return x+y
>>> def square(x): return x*x
>>> reduce(plus,� map(square, (1,20,3,10)))
510
MapReduce Programming Model
Input and output: each a set of key/value pairs.
Programmer specifies two functions:
map (in_key, in_value) → list(interm_key, interm_value)
reduce (interm_key, list(interm_value)) → list(out_value)
MapReduce Example: Word Count
"Mapper" nodes are responsible for the map function.
// "I do I learn" → ("I",1), ("do",1), ("I",1), ("learn",1) �map(String input_key, String input_value): � // input_key : document name (or line of text) � // input_value: document contents � for each word w in input_value:� EmitIntermediate(w, "1");
MapReduce Example: Word Count
"Reducer" nodes are responsible for the reduce function.
// ("I",[1,1]) → ("I",2)
reduce(String output_key, Iterator intermediate_values):� // output_key : a word� // output_values: a list of counts� int result = 0;� for each v in intermediate_values:� result += ParseInt(v);� Emit(AsString(result));
Data is stored on a distributed file system (DFS).
MapReduce WordCount Diagram
Map:
"I do I learn" → ("I",1), ("do",1), ("I",1), ("learn",1)
Reduce:
("I",[1,1]) → ("I",2)
MapReduce Processing Time Line
Manager assigns Map + Reduce tasks to "worker" servers.
As soon as a Map task finishes, worker can be assigned a new Map or Reduce task.
Data shuffle begins as soon as a given Map finishes.
Reduce task begins as soon as all data shuffles finish.
To tolerate faults, reassign task if a worker server goes down.
Process | Time | |||||||||||||||
User Program | MapReduce() | | ... wait ... | | | | | | | | | |||||
Manager | | | Assign tasks to worker machines... | | | | | | | |||||||
Worker 1 | | | Map 1 | Map 3 | | | | | | | | | | |||
Worker 2 | | | Map 2 | | | | | | ||||||||
Worker 3 | | | | | Read 1–A | | Read 3–A | | Read 2–A | Reduce A | ||||||
Worker 4 | | | | | Read 1–B | Read 3–B | Read 2–B | Reduce B | ||||||||
MapReduce WordCount Java code
(You don't need to understand this code.)
public static void main(String[] args)�throws IOException {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WCMap.class);
conf.setCombinerClass(WCReduce.class);
conf.setReducerClass(WCReduce.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
public class WCReduce extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable) values.next()).get();
}
}
output.collect(key, new IntWritable(sum));
}
public class WCMap extends MapReduceBase implements Mapper {
private static final IntWritable ONE = new IntWritable(1);
public void map(WritableComparable key, Writable value,
OutputCollector output,
Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
output.collect(new Text(itr.next()), ONE);
}
}
}
Spark
Apache Spark is a fast and general engine for large-scale data processing.
Speed:
Ease of Use:
Word Count in Spark’s Python API
file.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
One-line solution!
flatMap in Spark’s Python API
>>> def neighbor(n):
... return [n-1,n,n+1]
>>> R = sc.parallelize(range(5))
>>> R.collect()
[0, 1, 2, 3, 4]
>>> R.map(neighbor).collect()
[[-1, 0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5]]
>>> R.flatMap(neighbor).collect()
[-1, 0, 1, 0, 1, 2, 1, 2, 3, 2, 3, 4, 3, 4, 5]
Word Count in Spark's Python API
unix% cat file.txt
ah ah er
ah
if or
or uh
or
ah if
Word Count in Spark's Python API
>>> W = sc.textFile("file.txt")
>>> W.flatMap(lambda line: line.split()).collect()
['ah', 'ah', 'er', 'ah', 'if', 'or', 'or', 'uh', 'or', 'ah', 'if']
>>> W.flatMap(lambda line: line.split()).map(lambda word: (word,1)).collect()
[('ah', 1), ('ah', 1), ('er', 1), ('ah', 1), ('if', 1), ('or', 1), ('or', 1), ('uh', 1), ('or', 1), ('ah', 1), ('if', 1)]
>>> W.flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda a,b: a+b).collect()
[('er', 1), ('ah', 4), ('if', 2), ('or', 3), ('uh', 1)]
Parallel? Let's Check...
>>> def crunch(n):
... time.sleep(5) # to simulate number crunching
... return n*n
...
>>> crunch(10) # 5 seconds later
100
>>> list(map(crunch,range(4))) # 20 seconds later
[0, 1, 4, 9]
>>> R = sc.parallelize(range(4))
>>> R.map(crunch).collect() # 5 seconds later
[0, 1, 4, 9]
Summary: Distributed Computing
Distributed Computing: Many computers work together to achieve a common goal.
Manager-Worker Framework:
MapReduce: