Published using Google Docs
SE PAPM: Go for Parallel Computing
Updated automatically every 5 minutes

Seminar: Parallel Architectures and Programming Models

GO for Parallel Computing

by

Harald Schilly <harald.schilly@univie.ac.at>

© 2010/2011, Vienna, Austria

Documents

Source Code: http://bitbucket.org/schilly/vo-paraarch 

Presentation: http://docs.google.com/present/view?id=awhwrn3kbpd_256cjzchdcx 


TOC

TOC

Go

History/About/Basics

Principles

Concepts and Design

Code Organization

Object Oriented Programming

Surprises

Control Structures

Data Structures/Collections

Support for Parallel Computing

Multithreading

Goroutines

Channels

Examples

MapReduce

Example

Code

Mapper

Collector

Reducer

Outputter

Scaling

Outlook

RPC /w GOB or JSON serialization

Comparison with StremIt


Go

Go is a compiled, garbage-collected and concurrent programming language. Its code is written imperative and structured. Hence, it is similar to C and has the “feeling” of a scripting language like Python.

History/About/Basics

Go has it’s roots in previous work on Plan 9 and the Inferno Operating System at Bell Labs. It’s design has started around 2007 by Robert Griesemer, Rob Pike, and Ken Thompson (already famous for developing “B”, a predecessor of C). In fall 2009, Go was officially announced and released s an open source project (BSD licensed). Go is primarily developed by Google Inc. and according to them, already used for some production systems.

Principles

Concepts and Design

Code Organization

A Go program is organized into packages. A package is a collection of *.go files, starting with a package “name” declaration. Named identifiers are visible across a package (package scoping), outside a package only exported names are visible. Exports are marked by an uppercase first letter and they have the same meaning as “public” in other languages like Java. It is not required to pre-declare declarations and inside a package everything can be used and defined more or less everywhere.

Below the package “name” declaration is an optional list of “imports”, these are the packages that are needed by that program. The compiler automatically takes care of these dependencies. Exported identifies are accessed via “packagename”.”Identifier” with an uppercase first letter.

An executeable program resides in a package named “main”, where one of the files must declare a function “main()” where the execution starts.

A program is stopped and terminated when the main() function finishes. This might be confusing when there is still something going on in a background routine, because their processing is stopped when the main routine exists.

Object Oriented Programming

Go has types and is based on the concepts of object oriented programming, but it is much different from established realizations in the style of Java, C++, C# and Python. For me, it has some similarities with the statistical programming language S (as implemented in R) and hence, people coming from that corner might be able to easily familiarize themselves - others have to rethink the whole concept a bit more in depth.

First, there are no “classes” and there is no constructor. All structure is based on two types:

Functions can be declared everywhere. If they specify a receiver, they are like a method in a class, otherwise they are just a normal function. Example:

function (p *Person) birthday() int {

  p.age++

  return p.age

}

This function is attached to the Person struct from above and it returns an int type. Once again we can see the principle of orthogonality manifested by the relationship between types and methods.

Surprises

Control Structures

The next ingredient are control structures. Go has an if and a for.

Data Structures/Collections

There are typed arrays (fixed length), and slices (length may increase) and for more flexibility check out Vector[9]. Next to it are also similar data structures, e.g. heap, list and ring. There is a native keyword “map” for associative data structures.


Support for Parallel Computing

Multithreading

There are two types of multithreading:

Goroutines

Goroutines are the lightweight counterpart of threads for Go programs. They are necessary for synchronized communication and hence complete the support for deterministic multithreading in Go. They are easily started with the keyword go in front of a function call.

Channels

Buffered or unbuffered mechanism to communicate between two goroutines. The following example combines a channel and a Goroutine, also demonstrating full closures:

func main() {

  c := make(chan int)

  i := 42

  go func() {

    c <- i

  }()

  println(<-c)

}

⇒ Output: “42”

It is possible to switch on

Examples

MapReduce

MapReduce is a computational paradigm for computing some operations on a large body of data by splitting up the data and doing the calculations in parallel. More CPUs can be harvested at the same time and therefore the entire job is done much faster. The “mapper” is the task of applying a computation on subsets of data. Each result is “emitted” as a key/value pair. There are usually a couple of emitted key/value pairs for each mapping operation. Next, those emitted key/value pairs are collected according to their key. Then, the “reducer” combines values of one key. The following relation must be satisfied:

where  is the reducer-function,  the key and  the value  for each key . This means that repeated invocations of the reducer on partially reduced values should be possible.

This overview shows how the application does the processing:

Example

I’ve implemented a rather simple task involving reading several textfiles and emitting key/value pairs for certain statistics. The data is the complete quran in English, split up into several files. At it’s core, a mapping function reads on of the files, uses a regular expression to match for words (pattern “[A-Za-z]”), then emits a key “wl-%d” for the word length, where “%d” is replaced with the “len” of each token and then it iterates over each character of this word an emits a “ch-%s” where “%s” is replaced with the string representation of the character. The value is always one. Additionally, it emits a key “wc” and “cc” to sum up the total number of words and characters respectively.

Here is an abbreviated logfile as an example for just 2 files, 1 repetition, and 2 mappers:

2010/12/28 16:07:19 start                                                  

2010/12/28 16:07:19 mapper 0 start                                          

2010/12/28 16:07:19 mapper 1 start                                          

2010/12/28 16:07:19 Repeat #0                                              

2010/12/28 16:07:19 filename:  quran/AP01.TXT

2010/12/28 16:07:19 filename:  quran/AP02.TXT

... [reducers are started as needed while mappers operate]

2010/12/28 16:07:20 reducer ch-A start

2010/12/28 16:07:20 reducer wl-2 start

...

2010/12/28 16:07:20 mapper 1 done: 0.21554899

2010/12/28 16:07:20 reducer ch-L start

2010/12/28 16:07:20 reducer wl-7 start

... [all reducers are started and all mappers are done]

2010/12/28 16:07:20 mapper 0 done: 0.539472

2010/12/28 16:07:20 reducer ch-V done: 0.380317                            

2010/12/28 16:07:20 reducer wl-2 done: 0.382647                            

2010/12/28 16:07:20 reducer ch-C done: 0.38033798    

... [all reducers are done]

summary: ch-R = 3860

summary: wl-16 = 3

summary: ch-V = 749

summary: wl-2 = 2264

...

2010/12/28 16:07:20 total:  0.577972 [s]

Comment: It is clearly visible, that the two mappers are started first. Immediately after the program starts reading the file contents from disk they start emitting key/value pairs, corresponding reducers are created as needed and start their work, too. A short while after the mappers are done, the reducers finish too and after the last reducer has finished, the summary is displayed.

Code

Here are brief excerpts from the code, the full listing is available online.

Mapper

Basically, the mapper takes a string argument for a filename, opens the file, reads it, does some operations on it and emits key/value pairs (struct “Emit”). E.g. for counting the total number of characters, it emits a key “cc” with the value of the length of the current token.

// abbrivated code

func mapper(input chan string, emitter chan Emit) {

  for inputstring := range input {

    matches := regex.FindAllString(inputstring, -1)

    for _, token := range matches {

       // actual work ...

       wl := len(token)

       emitter <- Emit{"wl-" + str(wl), 1}

    }

  }

}

Collector

The collector sends Emit objects of the same key to the same pipe. On each pipe sits a corresponding reducer doing it’s work. If there is no pipe and reducer, they are created and started as needed. “reducer_chans” is of type map[string]chan Emit.

// abbrivated code

func collector(emitter chan Emit, result chan Emit) {

  for e := range emitter {

    rchan, exists := reducer_chans[e.Key]

    if !exists {

      rchan = make(chan int, BUFSIZE)

      reducer_chans[e.Key] = rchan  

      go reducer(e.Key, rchan, result)

    }

    rchan <- e.Value

  }

}

Reducer

The reducer sits at the end of each pipe and sums up all the Emit values. What exactly the “reduce” operation is, is defined in the Emit object (in this case just a plain inplace “+”).

// abbrivated code

func reducer(k string, in chan int, result chan Emit) {

  res := EmitNull(k)

  for val := range in {

    res.ReduceValue(&val)

  }

  result <- res

}

Outputter

The output is trivial, just lists the incoming results.

Scaling

I’ve run this code on a 24 Core machine with shared memory. (24x Intel Xeon X7460 @ 2.66GHz / 128GB RAM). During my benchmarks, the average load was 2-5.

Here is a plot showing the total running time vs. the number of mappers.

Two observations: First, due to the overall overhead it there isn’t any improvement above 12 parallel goroutines. My observations with different workloads (I simply iterated the whole data a couple of times to increase the volume) indicates, that it depends on the biggest chunk of data and the number of individual tasks how well it scales. My second observation is, that it doesn’t matter if you start too many mappers! There is a slight increase in the runtime (0.2 s from 8.8 to 9.0), but that’s rather insignificant.

Here some timing numbers:

# mappers

total ex. time [s]

net total mappers [s]

ratio

1

82.0

81.9

0.998

2

42.9

84.4

1.966

6

16.6

89.4

5.368

12

10.3

96.9

9.419

24

8.7

135.12

15.514

Outlook

In a more generic setting, it would be interesting to send function objects directly to each mapper and reducer to make the implementation more flexible. Here is an example of sending a generic function object to another goroutine, evaluating it, and returning the answer (see “funcobj” in the sources for the complete example)

First, a function creating a function object with closure for a local value x:

func f1(start int) func() int {

  x := start

  ret := func() int {

    x = 2*x+1

    return x

  }

  return ret

}

Then, a goroutine to evaluate functions in a pipe:

func eval(c chan func() int, ret chan int) {

  for f := range c {

    ret <- f()

  }

}


Finally, for the main program one needs to get a function object

  f10 := f1(10)

and create the goroutine for evaluation:
  go eval(c, ret)

and do the communication:
  c <- f10
 println("return f10: ",  <-ret)

This prints the expected value 21.

RPC /w GOB or JSON serialization

Another interesting enhancement would be to make this available on a cluster. The enhancement is simple: Each mapper, when started, is assigned to talk to a client over the network, i.e. a network IP adress with a portnumber. Each such client listens to incoming RPC calls, executes the code and returns the emit values in a key/value map. Then, the main program iterates over that map and passes on the Emit values to the collector.

I’ve implemented a simple client/server example[11] to demonstrate RPC, but it’s not suiteable for MapReduce, see the source code.

Netchan Package

I want to note that there is also a package for channel communications over a network. This netchan package[12] seems to be still in development, but it is an interesting idea to make channel communication transparent.

Both methods from above could be combined with sending a function object over the wire to abstract this implementation further from the actual task.

Comparison with StremIt

Benkner/Mehofer suggested me to look into the language StreamIt. I started reading its Cookbook[13] and here are some observations:

Go

StreamIt

any number of input, output or combined input&output channels per goroutine

only 1 input and 1 output.

One needs special keywords like split, join, duplicate and loop/feedbackloop to do more than just linear layouts.

on size fits it all

special “boxes” for different kinds of communication tasks

typed channels, for any types!

typed channels, but i’ve only seen int and float.

channels can be created at any time

fixed before execution

goroutines can be created at any time

fixed layout before execution

channels can be sent over channels

N/A

conditional blocking on communication, waits for activity on any number of channels

N/A

no automatism for clusters, but RPC support.

claims to automatically scale on clusters

more “direct” programming, compiled

metaprogramming, resulting file (.java ?) translated to several C files and then compiled.

no peeking, but channels can be buffered and there is a “ring” datastructure.

peeking into a stream possible

I don’t have proof, but it looks like Go could be seen as a generalization of StreamIt. All perceived benefits of StreamIt seem to cause inflexibility and extra work to learn how to handle the overall framework.


[1]a slice is a typed array with variable length

[2]http://groups.google.com/d/topic/golang-nuts/kWXYU95XN04/discussion

[3]http://en.wikipedia.org/wiki/Newsqueak

[4]Communications of the ACM 21 (8): 666–677. doi:10.1145/359576.359585

[5]http://golang.org/cmd/gofmt/

[6]http://blog.golang.org/2011/01/go-slices-usage-and-internals.html

[7]http://golang.org/doc/effective_go.html#if

[8]http://golang.org/doc/effective_go.html#switch

[9]http://golang.org/pkg/container/vector/

[10]http://golang.org/pkg/sync/#RWMutex

[11]https://bitbucket.org/schilly/vo-paraarch/src/3174e002486c/go/rpc/

[12]http://golang.org/pkg/netchan/

[13]http://groups.csail.mit.edu/cag/streamit/papers/streamit-cookbook.pdf