Seminar: Parallel Architectures and Programming Models
GO for Parallel Computing
by
Harald Schilly <harald.schilly@univie.ac.at>
© 2010/2011, Vienna, Austria
Documents
Source Code: http://bitbucket.org/schilly/vo-paraarch
Presentation: http://docs.google.com/present/view?id=awhwrn3kbpd_256cjzchdcx
Support for Parallel Computing
RPC /w GOB or JSON serialization
Go is a compiled, garbage-collected and concurrent programming language. Its code is written imperative and structured. Hence, it is similar to C and has the “feeling” of a scripting language like Python.
Go has it’s roots in previous work on Plan 9 and the Inferno Operating System at Bell Labs. It’s design has started around 2007 by Robert Griesemer, Rob Pike, and Ken Thompson (already famous for developing “B”, a predecessor of C). In fall 2009, Go was officially announced and released s an open source project (BSD licensed). Go is primarily developed by Google Inc. and according to them, already used for some production systems.
A Go program is organized into packages. A package is a collection of *.go files, starting with a package “name” declaration. Named identifiers are visible across a package (package scoping), outside a package only exported names are visible. Exports are marked by an uppercase first letter and they have the same meaning as “public” in other languages like Java. It is not required to pre-declare declarations and inside a package everything can be used and defined more or less everywhere.
Below the package “name” declaration is an optional list of “imports”, these are the packages that are needed by that program. The compiler automatically takes care of these dependencies. Exported identifies are accessed via “packagename”.”Identifier” with an uppercase first letter.
An executeable program resides in a package named “main”, where one of the files must declare a function “main()” where the execution starts.
A program is stopped and terminated when the main() function finishes. This might be confusing when there is still something going on in a background routine, because their processing is stopped when the main routine exists.
Go has types and is based on the concepts of object oriented programming, but it is much different from established realizations in the style of Java, C++, C# and Python. For me, it has some similarities with the statistical programming language S (as implemented in R) and hence, people coming from that corner might be able to easily familiarize themselves - others have to rethink the whole concept a bit more in depth.
First, there are no “classes” and there is no constructor. All structure is based on two types:
Functions can be declared everywhere. If they specify a receiver, they are like a method in a class, otherwise they are just a normal function. Example:
function (p *Person) birthday() int {
p.age++
return p.age
}
This function is attached to the Person struct from above and it returns an int type. Once again we can see the principle of orthogonality manifested by the relationship between types and methods.
The next ingredient are control structures. Go has an if and a for.
There are typed arrays (fixed length), and slices (length may increase) and for more flexibility check out Vector[9]. Next to it are also similar data structures, e.g. heap, list and ring. There is a native keyword “map” for associative data structures.
There are two types of multithreading:
Goroutines are the lightweight counterpart of threads for Go programs. They are necessary for synchronized communication and hence complete the support for deterministic multithreading in Go. They are easily started with the keyword go in front of a function call.
Buffered or unbuffered mechanism to communicate between two goroutines. The following example combines a channel and a Goroutine, also demonstrating full closures:
func main() {
c := make(chan int)
i := 42
go func() {
c <- i
}()
println(<-c)
}
⇒ Output: “42”
It is possible to switch on
MapReduce is a computational paradigm for computing some operations on a large body of data by splitting up the data and doing the calculations in parallel. More CPUs can be harvested at the same time and therefore the entire job is done much faster. The “mapper” is the task of applying a computation on subsets of data. Each result is “emitted” as a key/value pair. There are usually a couple of emitted key/value pairs for each mapping operation. Next, those emitted key/value pairs are collected according to their key. Then, the “reducer” combines values of one key. The following relation must be satisfied:
where is the reducer-function, the key and the value for each key . This means that repeated invocations of the reducer on partially reduced values should be possible.
This overview shows how the application does the processing:
I’ve implemented a rather simple task involving reading several textfiles and emitting key/value pairs for certain statistics. The data is the complete quran in English, split up into several files. At it’s core, a mapping function reads on of the files, uses a regular expression to match for words (pattern “[A-Za-z]”), then emits a key “wl-%d” for the word length, where “%d” is replaced with the “len” of each token and then it iterates over each character of this word an emits a “ch-%s” where “%s” is replaced with the string representation of the character. The value is always one. Additionally, it emits a key “wc” and “cc” to sum up the total number of words and characters respectively.
Here is an abbreviated logfile as an example for just 2 files, 1 repetition, and 2 mappers:
2010/12/28 16:07:19 start
2010/12/28 16:07:19 mapper 0 start
2010/12/28 16:07:19 mapper 1 start
2010/12/28 16:07:19 Repeat #0
2010/12/28 16:07:19 filename: quran/AP01.TXT
2010/12/28 16:07:19 filename: quran/AP02.TXT
... [reducers are started as needed while mappers operate]
2010/12/28 16:07:20 reducer ch-A start
2010/12/28 16:07:20 reducer wl-2 start
...
2010/12/28 16:07:20 mapper 1 done: 0.21554899
2010/12/28 16:07:20 reducer ch-L start
2010/12/28 16:07:20 reducer wl-7 start
... [all reducers are started and all mappers are done]
2010/12/28 16:07:20 mapper 0 done: 0.539472
2010/12/28 16:07:20 reducer ch-V done: 0.380317
2010/12/28 16:07:20 reducer wl-2 done: 0.382647
2010/12/28 16:07:20 reducer ch-C done: 0.38033798
... [all reducers are done]
summary: ch-R = 3860
summary: wl-16 = 3
summary: ch-V = 749
summary: wl-2 = 2264
...
2010/12/28 16:07:20 total: 0.577972 [s]
Comment: It is clearly visible, that the two mappers are started first. Immediately after the program starts reading the file contents from disk they start emitting key/value pairs, corresponding reducers are created as needed and start their work, too. A short while after the mappers are done, the reducers finish too and after the last reducer has finished, the summary is displayed.
Here are brief excerpts from the code, the full listing is available online.
Basically, the mapper takes a string argument for a filename, opens the file, reads it, does some operations on it and emits key/value pairs (struct “Emit”). E.g. for counting the total number of characters, it emits a key “cc” with the value of the length of the current token.
// abbrivated code
func mapper(input chan string, emitter chan Emit) {
for inputstring := range input {
matches := regex.FindAllString(inputstring, -1)
for _, token := range matches {
// actual work ...
wl := len(token)
emitter <- Emit{"wl-" + str(wl), 1}
}
}
}
The collector sends Emit objects of the same key to the same pipe. On each pipe sits a corresponding reducer doing it’s work. If there is no pipe and reducer, they are created and started as needed. “reducer_chans” is of type map[string]chan Emit.
// abbrivated code
func collector(emitter chan Emit, result chan Emit) {
for e := range emitter {
rchan, exists := reducer_chans[e.Key]
if !exists {
rchan = make(chan int, BUFSIZE)
reducer_chans[e.Key] = rchan
go reducer(e.Key, rchan, result)
}
rchan <- e.Value
}
}
The reducer sits at the end of each pipe and sums up all the Emit values. What exactly the “reduce” operation is, is defined in the Emit object (in this case just a plain inplace “+”).
// abbrivated code
func reducer(k string, in chan int, result chan Emit) {
res := EmitNull(k)
for val := range in {
res.ReduceValue(&val)
}
result <- res
}
The output is trivial, just lists the incoming results.
I’ve run this code on a 24 Core machine with shared memory. (24x Intel Xeon X7460 @ 2.66GHz / 128GB RAM). During my benchmarks, the average load was 2-5.
Here is a plot showing the total running time vs. the number of mappers.
Two observations: First, due to the overall overhead it there isn’t any improvement above 12 parallel goroutines. My observations with different workloads (I simply iterated the whole data a couple of times to increase the volume) indicates, that it depends on the biggest chunk of data and the number of individual tasks how well it scales. My second observation is, that it doesn’t matter if you start too many mappers! There is a slight increase in the runtime (0.2 s from 8.8 to 9.0), but that’s rather insignificant.
Here some timing numbers:
# mappers | total ex. time [s] | net total mappers [s] | ratio |
1 | 82.0 | 81.9 | 0.998 |
2 | 42.9 | 84.4 | 1.966 |
6 | 16.6 | 89.4 | 5.368 |
12 | 10.3 | 96.9 | 9.419 |
24 | 8.7 | 135.12 | 15.514 |
In a more generic setting, it would be interesting to send function objects directly to each mapper and reducer to make the implementation more flexible. Here is an example of sending a generic function object to another goroutine, evaluating it, and returning the answer (see “funcobj” in the sources for the complete example)
First, a function creating a function object with closure for a local value x:
func f1(start int) func() int {
x := start
ret := func() int {
x = 2*x+1
return x
}
return ret
}
Then, a goroutine to evaluate functions in a pipe:
func eval(c chan func() int, ret chan int) {
for f := range c {
ret <- f()
}
}
Finally, for the main program one needs to get a function object
f10 := f1(10)
and create the goroutine for evaluation:
go eval(c, ret)
and do the communication:
c <- f10
println("return f10: ", <-ret)
This prints the expected value 21.
Another interesting enhancement would be to make this available on a cluster. The enhancement is simple: Each mapper, when started, is assigned to talk to a client over the network, i.e. a network IP adress with a portnumber. Each such client listens to incoming RPC calls, executes the code and returns the emit values in a key/value map. Then, the main program iterates over that map and passes on the Emit values to the collector.
I’ve implemented a simple client/server example[11] to demonstrate RPC, but it’s not suiteable for MapReduce, see the source code.
Netchan Package
I want to note that there is also a package for channel communications over a network. This netchan package[12] seems to be still in development, but it is an interesting idea to make channel communication transparent.
Both methods from above could be combined with sending a function object over the wire to abstract this implementation further from the actual task.
Benkner/Mehofer suggested me to look into the language StreamIt. I started reading its Cookbook[13] and here are some observations:
Go | StreamIt |
any number of input, output or combined input&output channels per goroutine | only 1 input and 1 output. One needs special keywords like split, join, duplicate and loop/feedbackloop to do more than just linear layouts. |
on size fits it all | special “boxes” for different kinds of communication tasks |
typed channels, for any types! | typed channels, but i’ve only seen int and float. |
channels can be created at any time | fixed before execution |
goroutines can be created at any time | fixed layout before execution |
channels can be sent over channels | N/A |
conditional blocking on communication, waits for activity on any number of channels | N/A |
no automatism for clusters, but RPC support. | claims to automatically scale on clusters |
more “direct” programming, compiled | metaprogramming, resulting file (.java ?) translated to several C files and then compiled. |
no peeking, but channels can be buffered and there is a “ring” datastructure. | peeking into a stream possible |
I don’t have proof, but it looks like Go could be seen as a generalization of StreamIt. All perceived benefits of StreamIt seem to cause inflexibility and extra work to learn how to handle the overall framework.
[1]a slice is a typed array with variable length
[4]Communications of the ACM 21 (8): 666–677. doi:10.1145/359576.359585