1 of 67

So, You Want to Write a Beam SDK?

Robert Bradshaw

robertwb@google.com

Beam Summit London

October 2018

2 of 67

The Beam Vision

Provide a comprehensive portability framework for data processing pipelines, one that allows you to write your pipeline once in your language of choice and run it with minimal effort on the execution engine of choice.

3 of 67

Current state

4 of 67

Future state?

5 of 67

What is an SDK?

  • An SDK is the tool/library that lets the user author Beam pipelines.

SDK Client

6 of 67

What is an SDK?

  • An SDK is the tool/library that lets the user author Beam pipelines.
  • ...and submit it to a runner.

SDK Client

Runner

Job Service

7 of 67

What is an SDK?

  • An SDK is the tool/library that lets the user author Beam pipelines.
  • ...and submit it to a runner
  • ...with support for executing user code.

SDK Client

Runner

Job Service

Fn API Service

SDK Worker

8 of 67

What is an SDK?

An SDK is also an idiomatic interpretation of the Beam Model in a specific language.

9 of 67

An aside about DSLs

An SDK

  • Speaks directly to Runners
  • Offers the full Beam model (PCollections, Transforms, Windowing, …)
  • E.g. Current Java, Go, Python APIs

A DSL

  • Typically built on top of annother SDK
  • May only offer a subset of the model.
  • E.g. SQL, Scio

10 of 67

Constructing a Pipeline

11 of 67

Constructing a Pipeline

  • Lazily build up runner-agnostic DAG of deferred operations.
  • PCollections are generally just pointers to their creators.
    • But with composites, they may actually be outputs of multiple PTransforms
  • Track nested scoping

This is generally the easy part.

12 of 67

Required Transforms

An SDK should at minimum support.

  • ParallelDo (aka Map, FlatMap)
    • SideInputs, MultipleOutputs, State and Timers, Splittability, …
  • GroupBy
    • Typically GroupByKey (and window!)
  • WindowInto
    • Can leverage runner for actual windowing.
  • IO
    • Read and write something
  • User-defined composites.

13 of 67

Expected Transforms

  • Create
  • Flatten
  • Join/CoGroupByKey
  • Combine
    • Globally, PerKey

Also support for Metrics (e.g. counters), Pipeline results.

14 of 67

More on IO

  • Beam has a single "primitive" input: Impulse
  • All other IO consists of composites built on the powerful SplittableDoFn
    • See Radically Modular IO
  • Read is typically Split + ReadShard
  • Write is typically WriteShards + Finalize
  • Bare-bones is {ReadFrom,WriteTo}{Text,Avro}
  • More complex patterns can be supported
    • And one should be able to leverage PTransforms from other languages (future work)
  • This complexity is wrapped in simple transformations for�user.

15 of 67

Coders and Types

  • Static vs. Pipeline Construction vs. Runtime typing
    • Pipeline Construction-time checking can be a powerful middle ground
  • Element serialization
    • Must be deterministic for keys
    • Be wary of generic language serialization inefficiencies (in time and space)
  • Inference and registration
    • Specifying coders everywhere is generally too tedious.
  • Standard coders
    • Standardize binary representation for cross-language introspection.
    • KV, Iterable, LengthPrefix, ...

16 of 67

UserFns

  • Classes? Lambdas? Both?
    • Supporting methods such as {start,finish} bundle
  • Must be serializable
  • Calling conventions
    • How are "extra" arguments passed.
    • Performance concerns

Note that Composite PTransforms are not executed on Runner

17 of 67

Direct Runner

  • Very useful for debugging, unit testing
  • Can bootstrap against a Universal Local Runner.
    • Future work: loopback worker
  • Hybrid model
    • Go can run simple pipelines, or all pipelines on a ULR.

18 of 67

The Runner API

Or how to represent a Pipeline

19 of 67

How to define a Beam Pipeline

Beam pipelines are defined according to the protos in model/pipeline.

  • A Pipeline is simply a set of Components, a designated Root Transform, and some display data.
  • Components are referenced by pipeline-unique string ids.
    • Transforms reference inputs and outputs, Coders reference other coders, ...
  • Components often have a spec defined by URNs and payloads.
    • The URN defines the interpretation of the (binary) payload.
    • Some URNs are mandatory, some are common, and some are SDK-specific.
    • Runners that don't understand a URN and its (opaque) payload�can pass it through.

20 of 67

Learn by example

  • Most components are self-evident.

  • Payload (not present here) is always binary,�may have to deserialize to interpret.

{

unique_name: "transform_10":

spec: {

urn: "beam:transforms:group_by_key:v1"

}

inputs: { "pcollection_1": "in" }

outputs: { "pcollection_2": "out" }

}

21 of 67

ParDo operation arguably the most complicated.

unique_name: "my_map_fn":

spec: {

urn: "beam:transforms:pardo:v1"

payload: {

do_fn {

urn: "beam:dofn:language_serialized_fn"

spec: "0x5fdff4006f74f3af2005e9..."

environment: "beam:language_container"

}

side_inputs: {

"side1": {access_pattern: iterable, view_fn: {urn: …}, …}

}

state_specs: …

}

}

inputs: { "pcoll_1": "in", pcoll_2: "side1" }

outputs: { "pcoll_3": "out1", pcoll_4: "out2" }

22 of 67

Runner-interpreted URNs

  • A runner is allowed to substitute semantically�equivalent implementations of URNs it understands.
  • Combine[PerKey] is a standard example.
    • Parameterized by a CombineFn of 4 methods�() -> A (A, I) -> A (A*) -> A (A) -> O
    • If an SDK provides beam:transform:combine_per_key:v1�its workers must accept

beam:transform:combine_per_key_precombine:v1

beam:transform:combine_per_key_merge_accumulators:v1

beam:transform:combine_per_key_extract_outputs:v1

beam:transform:combine_grouped_values:v1

Per-shard pre-combine

Final aggregation

shuffle

23 of 67

The Jobs API

Or how to ask a Runner to run a Pipeline

24 of 67

Submitting a Job

The user uses an SDK to construct a pipeline.

SDK Client

25 of 67

Submitting a Job

The user uses an SDK to construct a pipeline.

SDK Client

26 of 67

Submitting a Job

Someone* starts runner with a Job Service (defined in model/job-management).

Runner

SDK Client

Job Service

*someone can be the User, the SDK itself, a Sys Admin setting up a long-running cluster, ...

27 of 67

Submitting a Job

The SDK connects to the Job Service and submits the pipeline.

Runner

SDK Client

Job Service

Here's a pipeline.

28 of 67

Submitting a Job

The Job Service provides a pointer to an Artifact Service

Runner

SDK Client

Job Service

Artifact Service

Put your stuff here.

29 of 67

Submitting a Job

The SDK uploads any required resources.

Runner

SDK Client

Job Service

Artifact Service

jar

resource

30 of 67

Submitting a Job

The SDK the requests the pipeline be run.

Runner

SDK Client

Job Service

Artifact Service

Do it!

31 of 67

Submitting a Job

The runner transforms, fuses, and breaks the graph into executable stages.

Runner

SDK Client

Job Service

Artifact Service

32 of 67

Submitting a Job

Runner is now ready to execute the pipeline.

Runner

33 of 67

The Fn API

Workers talking to Workers

34 of 67

Workers have Fn

Runner is now ready to execute the pipeline.

Runner

35 of 67

Workers have Fn

Runner

Runner

Runner

Runner

Runner

Runner Worker

Runner typically launches a master and/or many workers.

Runner Master

Runner

36 of 67

Workers have Fn

Runner

Runner

Runner

Runner

Runner

Runner Worker

Let's look at just one Runner worker.

Runner Master

Runner

37 of 67

Workers have Fn

Runner Worker

Let's look at just one Runner worker.

38 of 67

Communicating over the Fn API

The FnAPI, defined in model/fn-execution, defines four services.

  • Control Plane
    • Registers, schedules, and checks progress of bundles.
  • Data Plane
    • Runner-initiated push API
  • State Plane
    • Sdk Worker-initiated pull API
  • Logging
    • Push logs from worker to runner

All are long-lived streaming services hosted by runner or its workers.

A provision API and artifacts API also required if the standard boot scripts are not used.

39 of 67

Control Plane

  • Runner pushes a heterogeneous stream of (uniquely-id'd) requests which the SDK Worker responds to asynchronously.
  • Primary request is of type ProcessBundle
    • References a ProcessBundleDescriptor--a (possibly mutated) subset of the original pipeline, with GrpcRead and Write nodes attached to the dangling edges.
  • Errors returned on this channel as well.
  • (Flink only) An unbounded number of requests must be processed asynchronously.

40 of 67

Data Plane

  • A single bidirectional, multiplexed stream of PCollection elements.
  • The elements themselves are represented as a contiguous chunk of bytes, encoded with the runner-requested coder.
    • A runner may wrap unknown coders in LengthPrefix coders to if it needs to parse elements out of this stream.
  • Each chunk of elements identified by a target which indicates the bundle, transform, and input this set of elements is destined for.
  • An empty data chunk indicates end of stream, end of all streams indicates end of bundle.

41 of 67

State Plane

  • Used whenever an SDK needs to request data from the Runner
    • Side inputs, and (future) GBK value stream continuations also use this API
  • Heterogeneous stream of asynchronous requests and responses
    • Similar to Control Plane (but SDK initiated).
  • Cache token can be ignored if cross-bundle caching not implemented
  • Likewise, continuation token sketched out (not yet supported by any runners)

42 of 67

Logging

  • Lets runner filter/aggregate/store logs as it sees fit.
  • Often no-one's monitoring your stderr, which may live entirely in a container.
    • Important to hook up to root logger so users' logs get out.

43 of 67

Workers have Fn

Runner

Runner worker launches services.

Control Service

Data Service

State Service

Logging Service

44 of 67

Workers have Fn

SDK Worker

Runner

Runner launches appropriate worker(s), determined by inspecting the environment..

Control Service

Data Service

State Service

Logging Service

45 of 67

Workers have Fn

SDK Worker

Runner

Worker connects to control service.

Control Service

Data Service

State Service

Logging Service

46 of 67

Workers have Fn

SDK Worker

Runner

Runner sends register bundle process request.

Control Service

Data Service

State Service

Hey, I've got some work you should be able to do

Logging Service

47 of 67

Workers have Fn

SDK Worker

Runner

Runner sends register bundle process request.

Control Service

Data Service

State Service

Now process a bundle id:1 with that descriptor.

Logging Service

48 of 67

Workers have Fn

SDK Worker

Runner

SDK Worker examines Read/Write ports, opens data channel(s).

Control Service

Data Service

State Service

Data Service at localhost:5077

Logging Service

49 of 67

Workers have Fn

SDK Worker

Runner

Runner starts pushing data, worker starts bundle.

Control Service

Data Service

State Service

StartBundle()

data

Logging Service

50 of 67

Workers have Fn

SDK Worker

Runner

Worker plumbs bundle into execution graph, streams results back.

Control Service

Data Service

State Service

data

data

data

data

Logging Service

51 of 67

Workers have Fn

SDK Worker

Runner

Runner registers and requests execution of other bundles.

Control Service

Data Service

State Service

data

data

data

data

Here's another set of transforms you can do.

Do this new one too.

Do that first one again.

Logging Service

52 of 67

Workers have Fn

SDK Worker

Runner

Bundles may (actually typically) share the data stream.

Control Service

Data Service

State Service

data

data

data

data

Logging Service

53 of 67

Workers have Fn

SDK Worker

Runner

Progress may be requested.

Control Service

Data Service

State Service

data

data

data

How you doing with that first bundle?

data

'bout with everything you gave me.

Logging Service

54 of 67

Workers have Fn

SDK Worker

Runner

Some transforms may need access to state or side inputs.

Control Service

Data Service

State Service

data

data

data

data

???

Logging Service

55 of 67

Workers have Fn

SDK Worker

Runner

State service looked up on the bundle descriptor.

Control Service

Data Service

State Service

data

data

data

data

State Service at localhost:389

Logging Service

56 of 67

Workers have Fn

SDK Worker

Runner

As with data, the state stream is multiplexed.

Control Service

Data Service

State Service

data

data

data

data

Thanks!

You remember what that one guy said?

Yeah, here you go.

Logging Service

57 of 67

Workers have Fn

SDK Worker

Runner

All the while, the logs are plumbed via the logging service.

Control Service

Data Service

State Service

data

data

data

data

Thanks!

Logging Service

log.info

log.debug

log.warn

58 of 67

Workers have Fn

SDK Worker

Runner

End of stream(s) indicate end of bundle.

Control Service

Data Service

State Service

data

data

data

data

FinishBundle()

Logging Service

59 of 67

Workers have Fn

SDK Worker

Runner

SDK Worker sends back response for initial process bundle request, in parallel with tail of data.

Control Service

Data Service

State Service

data

data

data

data

Yo, I'm done with that first bundle.

Logging Service

60 of 67

Workers have Fn

SDK Worker

Runner

If there is no more work, the runner terminates the control channel.

Control Service

Data Service

State Service

Logging Service

61 of 67

Workers have Fn

SDK Worker

Runner

The worker then cleans up and terminates the other channels.

Control Service

Data Service

State Service

Logging Service

62 of 67

Closing the Loop

Back to the Job API

63 of 67

Closing the loop.

Runner Worker

All the while the worker runner is communicating status, counters, etc. to its master(s).

Runner Master

Runner

64 of 67

Closing the loop.

And the runner is communicating status back to the SDK, which may in turn publish it to the user.

Runner

SDK Client

Goin' fine.

Job Service

65 of 67

Summary

66 of 67

  • The SDK client is used to produce a platform agnostic representation of the pipeline encoded according to the Runner API.
    • This may have SDK-specific bits.
  • The pipeline, along with required artifacts, are passed over Job API to a runner.
  • The runner fires up workers which use the Fn API to do SDK specific work.
  • Status is plumbed back through runner to SDK Client.

Runner

SDK Client

SDK Worker

67 of 67

Questions?