So, You Want to Write a Beam SDK?
Robert Bradshaw
robertwb@google.com
Beam Summit London
October 2018
The Beam Vision
Provide a comprehensive portability framework for data processing pipelines, one that allows you to write your pipeline once in your language of choice and run it with minimal effort on the execution engine of choice.
Current state
Future state?
What is an SDK?
SDK Client
What is an SDK?
SDK Client
Runner
Job Service
What is an SDK?
SDK Client
Runner
Job Service
Fn API Service
SDK Worker
What is an SDK?
An SDK is also an idiomatic interpretation of the Beam Model in a specific language.
An aside about DSLs
An SDK
A DSL
Constructing a Pipeline
Constructing a Pipeline
This is generally the easy part.
Required Transforms
An SDK should at minimum support.
Expected Transforms
Also support for Metrics (e.g. counters), Pipeline results.
More on IO
Coders and Types
UserFns
Note that Composite PTransforms are not executed on Runner
Direct Runner
The Runner API
Or how to represent a Pipeline
How to define a Beam Pipeline
Beam pipelines are defined according to the protos in model/pipeline.
Learn by example
{
unique_name: "transform_10":
spec: {
urn: "beam:transforms:group_by_key:v1"
}
inputs: { "pcollection_1": "in" }
outputs: { "pcollection_2": "out" }
}
ParDo operation arguably the most complicated.
unique_name: "my_map_fn":
spec: {
urn: "beam:transforms:pardo:v1"
payload: {
do_fn {
urn: "beam:dofn:language_serialized_fn"
spec: "0x5fdff4006f74f3af2005e9..."
environment: "beam:language_container"
}
side_inputs: {
"side1": {access_pattern: iterable, view_fn: {urn: …}, …}
}
state_specs: …
}
}
inputs: { "pcoll_1": "in", pcoll_2: "side1" }
outputs: { "pcoll_3": "out1", pcoll_4: "out2" }
Runner-interpreted URNs
beam:transform:combine_per_key_precombine:v1
beam:transform:combine_per_key_merge_accumulators:v1
beam:transform:combine_per_key_extract_outputs:v1
beam:transform:combine_grouped_values:v1
Per-shard pre-combine
Final aggregation
shuffle
The Jobs API
Or how to ask a Runner to run a Pipeline
Submitting a Job
The user uses an SDK to construct a pipeline.
SDK Client
Submitting a Job
The user uses an SDK to construct a pipeline.
SDK Client
Submitting a Job
Someone* starts runner with a Job Service (defined in model/job-management).
Runner
SDK Client
Job Service
*someone can be the User, the SDK itself, a Sys Admin setting up a long-running cluster, ...
Submitting a Job
The SDK connects to the Job Service and submits the pipeline.
Runner
SDK Client
Job Service
Here's a pipeline.
Submitting a Job
The Job Service provides a pointer to an Artifact Service
Runner
SDK Client
Job Service
Artifact Service
Put your stuff here.
Submitting a Job
The SDK uploads any required resources.
Runner
SDK Client
Job Service
Artifact Service
jar
resource
Submitting a Job
The SDK the requests the pipeline be run.
Runner
SDK Client
Job Service
Artifact Service
Do it!
Submitting a Job
The runner transforms, fuses, and breaks the graph into executable stages.
Runner
SDK Client
Job Service
Artifact Service
Submitting a Job
Runner is now ready to execute the pipeline.
Runner
The Fn API
Workers talking to Workers
Workers have Fn
Runner is now ready to execute the pipeline.
Runner
Workers have Fn
Runner
Runner
Runner
Runner
Runner
Runner Worker
Runner typically launches a master and/or many workers.
Runner Master
Runner
Workers have Fn
Runner
Runner
Runner
Runner
Runner
Runner Worker
Let's look at just one Runner worker.
Runner Master
Runner
Workers have Fn
Runner Worker
Let's look at just one Runner worker.
Communicating over the Fn API
The FnAPI, defined in model/fn-execution, defines four services.
All are long-lived streaming services hosted by runner or its workers.
A provision API and artifacts API also required if the standard boot scripts are not used.
Control Plane
Data Plane
State Plane
Logging
Workers have Fn
Runner
Runner worker launches services.
Control Service
Data Service
State Service
Logging Service
Workers have Fn
SDK Worker
Runner
Runner launches appropriate worker(s), determined by inspecting the environment..
Control Service
Data Service
State Service
Logging Service
Workers have Fn
SDK Worker
Runner
Worker connects to control service.
Control Service
Data Service
State Service
Logging Service
Workers have Fn
SDK Worker
Runner
Runner sends register bundle process request.
Control Service
Data Service
State Service
Hey, I've got some work you should be able to do
Logging Service
Workers have Fn
SDK Worker
Runner
Runner sends register bundle process request.
Control Service
Data Service
State Service
Now process a bundle id:1 with that descriptor.
Logging Service
Workers have Fn
SDK Worker
Runner
SDK Worker examines Read/Write ports, opens data channel(s).
Control Service
Data Service
State Service
Data Service at localhost:5077
Logging Service
Workers have Fn
SDK Worker
Runner
Runner starts pushing data, worker starts bundle.
Control Service
Data Service
State Service
StartBundle()
data
Logging Service
Workers have Fn
SDK Worker
Runner
Worker plumbs bundle into execution graph, streams results back.
Control Service
Data Service
State Service
data
data
data
data
Logging Service
Workers have Fn
SDK Worker
Runner
Runner registers and requests execution of other bundles.
Control Service
Data Service
State Service
data
data
data
data
Here's another set of transforms you can do.
Do this new one too.
Do that first one again.
Logging Service
Workers have Fn
SDK Worker
Runner
Bundles may (actually typically) share the data stream.
Control Service
Data Service
State Service
data
data
data
data
Logging Service
Workers have Fn
SDK Worker
Runner
Progress may be requested.
Control Service
Data Service
State Service
data
data
data
How you doing with that first bundle?
data
'bout with everything you gave me.
Logging Service
Workers have Fn
SDK Worker
Runner
Some transforms may need access to state or side inputs.
Control Service
Data Service
State Service
data
data
data
data
???
Logging Service
Workers have Fn
SDK Worker
Runner
State service looked up on the bundle descriptor.
Control Service
Data Service
State Service
data
data
data
data
State Service at localhost:389
Logging Service
Workers have Fn
SDK Worker
Runner
As with data, the state stream is multiplexed.
Control Service
Data Service
State Service
data
data
data
data
Thanks!
You remember what that one guy said?
Yeah, here you go.
Logging Service
Workers have Fn
SDK Worker
Runner
All the while, the logs are plumbed via the logging service.
Control Service
Data Service
State Service
data
data
data
data
Thanks!
Logging Service
log.info
log.debug
log.warn
Workers have Fn
SDK Worker
Runner
End of stream(s) indicate end of bundle.
Control Service
Data Service
State Service
data
data
data
data
FinishBundle()
Logging Service
Workers have Fn
SDK Worker
Runner
SDK Worker sends back response for initial process bundle request, in parallel with tail of data.
Control Service
Data Service
State Service
data
data
data
data
Yo, I'm done with that first bundle.
Logging Service
Workers have Fn
SDK Worker
Runner
If there is no more work, the runner terminates the control channel.
Control Service
Data Service
State Service
Logging Service
Workers have Fn
SDK Worker
Runner
The worker then cleans up and terminates the other channels.
Control Service
Data Service
State Service
Logging Service
Closing the Loop
Back to the Job API
Closing the loop.
Runner Worker
All the while the worker runner is communicating status, counters, etc. to its master(s).
Runner Master
Runner
Closing the loop.
And the runner is communicating status back to the SDK, which may in turn publish it to the user.
Runner
SDK Client
Goin' fine.
Job Service
Summary
Runner
SDK Client
SDK Worker
Questions?