RFC: Apache Beam Go SDK design

https://s.apache.org/beam-go-sdk-design-rfc

Henning Korsholm Rohde (herohde@google.com)

Bill Neubauer (wcn@google.com)

What’s so different about Go?

Go is strongly-typed, compiled language with a non-inheritance-based type system. It is both different enough that neither Java nor Python’s approaches can be readily re-used and has a natural programming style that would make direct reuse of some aspects awkward to Go programmers. Main relevant differences:

No generics

The Beam model and Java SDK makes extensive use of generics. It allows Java pipelines to be mostly type-safe using the capabilities of the language alone and it is used in method chaining, contexts, KV, reusable generic transforms, etc. Go offers no generics, except for a few built-in datatypes. The situation is comparable to Java 1.4. There’s a default base interface similar to java.Object, but using it would make the Go SDK effectively untyped, require casts everywhere and fail to take advantage of the type capabilities that do exist.

No function or method overloading

Despite strong typing, functions and methods cannot be overloaded in Go. For example, it makes a single “apply” method problematic. In contrast, the signature of ParDo in Java isn’t concerned with types of arguments, since overloads can handle all the needed cases.

No inheritance

The Go type system uses interfaces and composition instead of inheritance. Especially combined with the lack of generics, it makes some otherwise straightforward things harder to do. We’ll see this in composite transforms.

Limited reflection and serialization support

Go has no equivalent of Java’s class.forClass and cannot serialize types, functions or closures. This means that some of the patterns in Java, such as capturing local variables for side input tags, are not possible. These limitations end up causing a small tax for the user in some cases.

No annotation support

Java uses annotations for flexible DoFn signatures, default coder specification, etc. Go does not offer such functionality, except for string tags on struct fields. Again, this limitation makes some things harder to do and eliminates some design options (such as using parameter annotations to define side input tags).

But ...

Go has features such as first-class functions, full type reflection (as opposed to type-erased), multiple return values and more. We should not fail to take advantage of them.

Key design points

In light of the Go language constraints and opportunities, the following major design points guide much of the rest of the design. We use the phrase “native” types to refer to built-in Go types as opposed to synthetic Beam types, such as KV<int,string>.

Natively-typed DoFns and other user functions

User functions that execute at runtime should be natively typed and not require casts. It should also not be possible to output an element of the wrong type. In other words, deferred user functions should look and feel much like normal Go functions and allow users to pick the same types as they normally would. For example, a function to increment an integer should look like:

func (i int) int { return i+1 }

Not require explicit or implicit casting that may fail at runtime, say:

func (i interface{}) interface{} { return i.(int)+1 }

func (i beam.Obj) beam.Obj { return beam.IntObj(i.AsInt() + 1)}

This turned out to be non-trivial to achieve for KV and side inputs. One cost is that the Go SDK runtime uses reflection-based metaprogramming extensively.

Weakly-typed PTransforms that capture arity natively

The lack of generics and overloading eliminates the option of a natively-typed apply chain and forces PCollections to be natively untyped (the Beam type is a runtime value) and in turn PTransforms to be weakly typed. The choice of natively-typed user functions also means that they do not conform to a small set of types that could inform the type system. Instead, we went with what we call the “direct” style, where PTransforms are Go functions that are invoked directly and preserve PCollection arity. The fluent style is not commonly used in Go. We use the natively generic slice (array-like) type for PCollections to retain some native type checking at the PCollection level. For example, transforms can have natural types that ensure that the pipeline shape is right:

Flatten: func ([]PCollection) PCollection

Sources and sinks fall out naturally as well:

textio.Read:  func (string) PCollection

        textio.Write: func (string, PCollection)

This approach avoids the need for PCollectionList, PCollectionTuple, etc. For ParDo, however, we need to introduce ParDo0, ParDo2, ParDo3, etc to indicate the return arity. Losing the arity of the PCollection at compile time was not a preferable design compromise since it would be too easy to write invalid pipelines. We do allow the user to eschew this safety by using ParDoN which returns a slice of PCollection.

Static type checking at pipeline construction time

The Go SDK performs static type checking of the pipeline during construction to avoid runtime type errors. The Beam type checker ensures that input/output types of PCollections match up with transforms. It uses a custom representation, FullType, to accurately represent Beam types such as KV<int,string>. The most complex aspect is analysing the DoFn signatures and validating that it’s compatible with the typing context. There are three components that make it hard:

  1. KV is implicit. We use multiple arguments and return tuples to represent unfolded KVs for DoFns. This design choice thus sidesteps KV as a runtime value in user code by making it a built-in second class value. It does mean that the number of the PCollections consumed do not necessarily match the arity of the DoFn.
  2. Side input forms. We allow a range of native Go generic types as side input, incl. functional and slice types, for convenience and to keep with user functions as natively typed. However, that choice makes type checking less straightforward, because a DoFn accepting int x string can be interpreted as either a DoFn on KV<int,string> or a DoFn on int with a singleton side input of type string.
  3. Simulated generic types. Last, but not least, we achieve some of the effect of generics by introducing special “universal” types T, U, ..., X, Y, Z over interface{}. These types are in essence “typedefs to void*”, except in the Go type system they are natively opaque (i.e., cannot be implicitly converted to anything, even each other) and runtime values are convertible to any type. The effect is generic DoFns and thus transformations for all practical purposes. For example:

                swapKV: func (x X, y Y) (Y, X) { return y, x }

The type checker will infer that a KV<int,string> will be transformed to a KV<string,int>, say, when transformed by the swapKV DoFn, treating the special types as type variables. The Go type system prevents type mistakes such as returning (x, y) or (42, x). The limitation is that functions must use the fixed set of universal types and that non-equality type constraints are not possible.

During pipeline construction, binding and substitution logic are used to perform type-checking of the pipeline, resolving universal types into concrete types. If type-checking fails, the pipeline isn’t valid, and the user gets a type-checking error to help them debug.

The type checker is intimately tied to the universe of user function signatures and the ability to accurately process them. Most reusable transforms use simulated generic types, such as stats.Count or filter.Dedup. The Go SDK type system in some cases exceeds what the Java type checker can catch, such as using a wrong or non-existent side input tag in a DoFn -- the Java type system or SDK cannot easily catch such a mistake, because the tag being used is part of the closure only and not accessible until runtime.

Error handling

For error handling, Go commonly uses returned errors (as the last tuple parameter with the special “error” type) and manual propagation. Go has no exceptions, except for a “panic” which is generally reserved for unrecoverable failures.

Runtime user functions should follow normal Go conventions and return an error on failure. For example,

            trySideEffectFn: func (string) (string, error)

Where the function would return (“”, <some error>) if it failed to perform the side effect. The type checker understands that the (optional) trailing error is not part of the data signature. The runtime will check the error when the function invoked and fail the bundle if not nil.

During pipeline construction, however, transforms should panic at construction time as opposed to returning (PCollection, error). This style is a readability win and allows function chaining. It also works well for static pipelines where bad code should panic. To support programmatic construction of pipelines from user input, we have TryXXX methods that return errors. These APIs don’t allow for function chaining, but for programmatic pipeline construction, that’s OK. In that usage, the plan is being built a node at a time, so chaining wouldn’t be used. Users can mix the TryXXX and XXX semantics freely. Both are the in public API.

Other languages can get this in a single API surface by strategic usage of exception handlers. Since we don’t have exceptions, the bifurcated API surface is necessary to get the differing usability experiences.

Examples

The following examples illustrate most aspects of the Go SDK. They offer a concrete context in which to understand the model representation.

  • Wordcount progression

The latest version of the code can be found here:

https://github.com/apache/beam/tree/master/sdks/go

All examples are under “examples” and the main package under “pkg/beam”. All other code links in this document point into a snapshot for long-term stability and consistency with the descriptions here.

Model representation

The below provides an overview of how the Go SDK represents the various objects and concepts in the Beam programming model. The code comments and examples provide further documentation.

Pipeline

Pipeline is still an object. While transforms can still be said to be “applied” to the pipeline, the mechanism is different: each primitive transform takes a pipeline as an argument and adds nodes/edges to the underlying graph (execution plan). This approach avoids the need for intermediate PBegin, PDone, PValue, etc types. The pipeline also holds the current (monitoring) scope for composite transforms to allow them to be Go functions. An advantage here is that transforms have more precise control.

Runner

A pipeline runner is simply any Go function that accepts a context and Pipeline for “execution”. Runners can be invoked directly or register themselves to make it easy for users to use them.

We currently do not have an equivalent of PipelineResult. Once the portability job API is stable and an implementation available, we’ll likely introduce a notion of PipelineResult based on that.

PCollection

PCollection represents a homogeneous collection of data during pipeline construction. It is untyped, but holds the Beam type as a special FullType value. It also holds the coder and other metadata values, similarly to other SDKs.

FullType represents a (possibly generic) Beam type and is essential for type checking. FullType allows us to use the Go type system reflection, but build composite types like Windowed<KV<string, int>> or KV<X,string>. Each FullType has a root type, which is either a user-defined data type or one of our container types for KV encoding or windowing. Java largely handles this through generics and has little need for a separate representation, since these types all implement the same interface and the code that handles them can be generic.

Coder

Coder is a contract for transforming data to/from a byte sequence. In Go, we distinguish between system coders and simpler user-defined custom coders. This distinction is needed to handle the composite types specially, because Beam types like KV<int, string> do not actually exist as runtime values. A benefit is that users who wish to use custom coding do not have to be exposed to lengths and inner/outer contexts.[c][d][e][f]

We assume that all data types have a coder. The user can assign their coder on a per-transform-output basis. If the user doesn’t assign a coder, the SDK assigns a default coding behavior. If the data type cannot be encoded by the default coder, the pipeline construction fails. The behavior of the default coder is similar to the behavior provided by the Go JSON package. The coder package documentation contains precise details.

DoFn and other user functions

DoFns and other user functions generally are allowed to take one of three forms balancing convenience and expressibility. The Go SDK can offer more flexibility here, because we implement our own type checker. The forms are:

  1. Function. The simplest form is a function for per-element processing. For example,

func incFn(i int) int { return i+1 }

func filterFn(i int, emit func(int)) { if i > 5 { emit(i) }}

The first function returns exactly one output whereas the second emits zero or more outputs. Functions can be defined inline. The first style conveniently allows a vast number of standard library functions to be directly usable as DoFns, such as strings.ToUpper, without wrappers of any kind.

  1. Struct with methods. The struct form accommodates user functions that perform bundle-level processing, require special setup/teardown or is configured by construction-time values. It is similar to a Java DoFn. For example,

type fn struct {

                          Filter string `json:"filter"`

re *regexp.Regexp

}

func (f *fn) Setup() {

f.re = regexp.MustCompile(f.Filter)

}

func (f *fn) ProcessElement(s string, emit func(string)) {

                if f.re.MatchString(word) { emit(word, count) }

                }

Uses the special Setup method to initialize itself with a construction-time “Filter” value. The methods names must be public and are recognized by name, because their signature vary. For DoFn structs, possible methods are

Setup, StartBundle, ProcessElement, FinishBundle, Teardown

They have the same interpretation as in other SDKs. Combine functions work similarly, but use different method names.

  1. Dynamic function. The last form accommodates a narrow but advanced use case where the function signature is not static and the runtime needs to recreate the function, due to Go limitations in reflection and serialization. Currently used by Partition only.

Functions may also optionally take a context.Context and/or EventTime or return an error. The runtime will provide and handle these parameters accordingly. The signature thus informs the runner as to what the user function actually uses and it may potentially optimize accordingly.

The following are examples of legal DoFns signatures for doFn in beam.ParDo1(p, doFn, words) with a string-typed incoming PCollection:

A DoFn can be a simple function:

func (word string) string

func (word string, emit func(string))

They can return an error if necessary:

func (word string) (string, error)

func (word string, emit func(string)) error

Take in an optional EventTime or context: 

func (t EventTime, word string) string

func (ctx context.Context, word string, emit func(string))

Or any combination of the above, such as:

func (ctx context.Context, t EventTime, word string) (string, error)

More formally, this expression describes the signature of an acceptable DoFn.

(Context?, EventTime?, main input, side inputs*, outputs*}) -> (EventTime?, output?, error?)

Restrictions:

  1. If the function returns an output, that output is considered ‘first’ and outputs from the arguments are after it.
  2. The main output type can be a KV type.
  3. The context.Context and beam.EventTime types cannot be used as input or output types.

Side inputs and GBK values are re-iterable streams of data.  In Java, they are captured by an object with a generic interface. The interface handles the re-iteration contract, and provides convenience methods to extract singleton values. In Go, we capture the underlying types in the type signature of the side input because we have no choice. However, this does afford some elegance and flexibility. For example,

func (word string, size int, sample []string) string

Is the type of a DoFn with two side inputs: a singleton ‘size’ and a slice ‘sample’. The runtime performs the necessary conversions. There are no tags and the side inputs are positional. The iteration cases (which are required for GBK values) use functional arguments:

func (word string, sample func (*string) bool) string

func (word string, resample func () func (*string) bool) string

 

The latter allows the DoFn to iterate multiple times over the side input. For the type selected by the user can be used as a runner hint of how to manage the data.

Side outputs are just emitter functions, like the main output. They are also positional:

 

        func (word string, emitSide func(string)) string 

        func (word string, emitMain, emitSide func(string))

Unlike the main output, side outputs can't be return values of the function, since doing so would be ambiguous with a KV return.

The functional side inputs, the main output, and the side outputs allow an optional EventTime parameter for the timestamp to be accessed or provided. For example,

        func (t EventTime, word string) (EventTime, string)

func (t EventTime, word string, emit func (EventTime, string))

Finally, user functions can use universal types in place of any top-level pipeline data type.

Transforms

The primitive transforms are Go functions in the beam package, with access to the internals of the Pipeline graph. The transforms generally have weak signatures and most validation is left for the type checker.

Impulse

Impulse is a singleton source used to trigger other (source) transforms:

func Impulse(s Scope) PCollection

It always returns a singleton PCollection<[]byte>.

Create

Create inserts a fixed set of values into the pipeline. The output type of the PCollection is the shared concrete type of the values.

func Create(s Scope, values ...interface{}) PCollection

It works similarly to the other SDKs.

ParDo family

ParDo is a family of functions, ParDo0, ParDo, ParDo2, ParDo3, etc for invoking DoFns on an incoming PCollection. The N encodes the number of outputs. The signature for the N=1 case is:

ParDo(s Scope, interface{}, PCollection, ...Option) PCollection

The transforms takes a pipeline, any value as the DoFn, the incoming PCollection and any number of options for specifying side input. It returns a single output PCollection, whose type

is inferred from the DoFn type and the side input types.

For example, the following DoFn has 1 int-typed singleton side input and 2 string-typed output:

        func splitFn(word string, avg int, big, small func(string)) {

                if len(word) < avg { small(word) } else { big(word) }

}

It could be used as follows:

        

        words := beam.Create(s, “foo”, “foobar”)

        avg := beam.Create(s, 4)

        big, small := beam.ParDo2(s, splitFn, words, beam.SideInput{avg})

The two output PCollections would each be of string type and at runtime would be singleton collections containing the values “foobar” and “foo”, respectively. The preservation of native arity and Go native type inference make the ParDo transform applications concise.

GroupByKey

GroupByKey has a simple signature:

        func GroupByKey(s Scope, a PCollection) PCollection

Like other SDKs, it expects a KV<A,B> but here returns a PCollection with a special GBK<A,B> type for the GBK result. The type system and runtime understands these composite types. We use a special GBK type instead of composing KV<A,Iter<B>>, say, to make it easier for them to handle.

Flatten

Flatten has the expected signature:

        func Flatten(s Scope, cols ...PCollection) PCollection

        

Combine

Combine is similar to ParDo in that it takes a combine user function and possibly side inputs:

Combine(Scope, interface{}, PCollection, ...Option) PCollection

CombinePerKey(Scope, interface{}, PCollection, ...Option) PCollection

The combine is either a global or per-key combine, depending on the variant called.

The simplest combiner form is a binary function, where the accumulator and result types are identical. For example,

func sumFn(x, y int) int { return x + y }

Can be used as:

numbers := beam.Create(s, 1, 2, 3, 4)

sum := beam.Combine(s, sumFn, numbers)

The resulting PCollection is of type int and will be a singleton with the value 10 at runtime. Note that sumFn cannot use simulated generic types, because the + operator is not available for the universal types. To that end, we created a code-generation tool, specialize, to generate type-specialized transforms from a template. It is used in stats.Sum, for example.

Partition

Partition is similar to the other SDKs. It partitions a PCollection<A> into N partitions based on a partition function of type A -> int. The signature is:

        Partition(Scope, int, interface{}, PCollection) []PCollection

Partition is not a primitive despite significant metaprogramming and could be implemented in user code.

No Source/Sink

The external Go SDK will be using Splittable DoFn, when supported by the FnAPI, and has no special support for sources or sinks. Sources currently use Impulse and ParDos. Sinks are just ParDos.

The standard IOs include textio and bigqueryio. The latter leverages Go struct tags to apply metadata to structured data types, which greatly improves the readability of sources/sinks using cloud connectors. This is evident in the tornadoes example, as compared to Java.

Currently unsupported

The Go SDK is a work in progress and there are numerous TODOs throughout the code. Some of the bigger unsupported pieces[g][h] are:

  1. Splittable DoFn. SDF is not supported at this time.
  2. State API. Same as above. From a design perspective, the surface area for State API is approximately the same as complexity as the union of side inputs and side outputs. We don’t foresee any risks in implementing it, we just have no reason to do so at this time.
  3. Metrics API. Our API for combiners has made sure we have experience with generating a Go API with the needed characteristics (associativity, commutativity) that is needed for metrics, so we view the risk here as low. We are waiting for more support from Fn API before we pursue our implementation.[i]
  4. Windowing[j] and triggers. We track windowing information with every element because it’s plumbed through the Fn API. The window we’ve implemented is the global window, so we don’t have any particular support for specifying timestamps. Our user and emitter functions can already be enriched with timestamp arguments, so we see no risk here. The interaction of triggers through Fn API is a topic that we’d like to see resolved with Python streaming before we venture into that space.
  5. Coder registry. It is not yet present. We plan to special case protocol buffer and avro support. We’ll explore more general coder registries based on community feedback.
  6. PipelineResult. We plan to address that when the portability job management APIs are present and implemented.

The Go direct runner supports batch only. We plan to rely on the upcoming Universal Local Runner (ULR) for streaming and advanced use cases, which is a benefit of targeting the portability framework.

[a]What I don't like about these examples is they use a helper package (stats) to do the counting. It's not clear how to write a custom reducer. It would be more helpful as an example if the counting were implemented in wordcount.go

[b]Good point. I don't really disagree with you, but the wordcount examples deliberately follow the structure of the other languages closely for uniformity here:

https://beam.apache.org/get-started/wordcount-example/

[c]This would also mean that users need to be able to fit the whole object in memory (potentially twice depending on the decoder) and won't be able to utilize remote references (https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50) for large objects.

How does this work for the GBK value iterable, does the entire iterable need to fit in memory or is this a specially handled type?

[d]We have not looked at support for remote references yet, but that will likely require special support.

GBK value iteration is a handled by the system and only each element needs to fit into memory.

[e]This will mean that for user defined types the length is likely to be encoded twice, once by the "system" and once by the format.

[f]Yes, but isn't that also the case for other SDKs to some extent? Java custom types will also be length-prefixed if going into GBK, for example.

That said, the custom coder form is familiar to Go programmers as well as convenient, but it's certainly not necessarily the only option possible.

[g]CoGBK support is another big piece. Short doc here: https://docs.google.com/document/d/18SjId_0OQLUX2v5EbNjWfJHLnx96R0V8poLSUKmymig/edit#

[h]Integration test docs here: https://docs.google.com/document/d/1jy6EE7D4RjgfNV0FhD3rMsT1YKhnUfcHRZMAlC6ygXw/edit?usp=sharing

[i]FnAPI User metrics have been implemented per http://s.apache.org/beam-metrics-api

See the GoDoc at

http://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/core/metrics

[j]Windowing support is done: https://issues.apache.org/jira/browse/BEAM-3303