1 of 33

Portability: Delivering on the Vision of Beam

Robert Bradshaw

robertwb@google.com

Stockholm - July 2019

2 of 33

The Beam Vision

Provide a comprehensive framework for data processing pipelines, one that allows you to write your pipeline once in your language of choice and run it with minimal effort on your �execution engine of choice.

3 of 33

The Beam Vision

input | CombinePerKey(sum)

Python

input.apply(

Sum.integersPerKey())

Java

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL

Cloud Dataflow

Apache Spark

Apache Flink

Apache Apex

Gearpump

Apache Samza

Apache Nemo (incubating)

IBM Streams

input.combineByKey(_ + _)

Scala

4 of 33

Beam Reality (pre 2018)

input | CombinePerKey(sum)

Python

input.apply(

Sum.integersPerKey())

Java

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL

Cloud Dataflow

Apache Spark

Apache Flink

Apache Apex

Gearpump

Apache Samza

Apache Nemo (incubating)

IBM Streams

input.combineByKey(_ + _)

Scala

Sum Per Key

Java objects

Sum Per Key

Python objects

?

5 of 33

Not a Viable Solution

input | CombinePerKey(sum)

Python

input.apply(

Sum.integersPerKey())

Java

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL

Cloud Dataflow

Apache Spark

Apache Flink

Apache Apex

Gearpump

Apache Samza

Apache Nemo (incubating)

IBM Streams

input.combineByKey(_ + _)

Scala

Sum Per Key

Java objects

Sum Per Key

Python objects

Go objects

Sum Per Key

6 of 33

Portability Framework

The Portability Framework is a language-agnostics way of representing and executing Beam Pipelines.

Runner

SDK Client

SDK Worker

7 of 33

Portability Framework

Runner

SDK Client

SDK Worker

Define Pipelines

(protobuf)

Submit and Monitor Pipelines

(grpc)

Execute User Code

(docker, grpc)

8 of 33

Portability Framework! [2019]

Apache Spark

Apache Flink

Apache Apex

Gearpump

Cloud Dataflow

Apache Samza

Apache Nemo (incubating)

IBM Streams

Sum Per Key

Java objects

Sum Per Key

Beam

Portable protos

input | CombinePerKey(sum)

Python

input.apply(

Sum.integersPerKey())

Java

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL

input.combineByKey(_ + _)

Scala

9 of 33

So, what's in it for me?

In addition to realizing the vision of a full cross-product of Runner and SDKs, the portability API also enables

  • Configurable, Hermetic Worker Environment
  • Multi-Language Pipelines
  • Faster/exclusive delivery of new features (such as SDF)

10 of 33

Hermetic Worker Environment

  • Code deployed and executed on remote machines
    • Configuration is runner-specific, runner-constrained
  • Shipping dependencies is hard
    • Especially in languages without fat jars

11 of 33

Hermetic Worker Environment

  • Code deployed and executed on remote machines
    • Configuration is runner-specific, runner-constrained
  • Shipping dependencies is hard
    • Especially in languages without fat jars
  • Develop, test locally
    • Often very different than deployment environment

12 of 33

Beam Environments

  • Each user operation has an associated environment in which to execute.
    • Typically the SDK provides a default environment
  • This environment can be specified as an arbitrary Docker container
    • Ahead-of-time installation
    • Arbitrary dependencies
    • Arbitrary customization
    • Runner isolation
  • Existing runtime injection of artifacts still supported
    • Jars, packages, binaries
    • Environments can be shared
    • No need to re-build image on each compile

13 of 33

Beam Environments

  • These environments can be used locally as well
    • Local mode that matches production mode(s).
    • Faster, more faithful tests.
  • Lighter-weight environments are available as well.
    • Subprocess(es)
    • Loopback
    • Embedded

14 of 33

Beam Environments

  • These environments can be used locally as well
    • Local mode that matches production mode(s).
    • Faster, more faithful tests.
  • Lighter-weight environments are available as well.
    • Subprocess(es)
    • Loopback
    • Embedded

15 of 33

Beam Environments

  • These environments can be used locally as well
    • Local mode that matches production mode(s).
    • Faster, more faithful tests.
  • Lighter-weight environments are available as well.
    • Subprocess(es)
    • Loopback
    • Embedded

16 of 33

Beam Environments

  • These environments can be used locally as well
    • Local mode that matches production mode(s).
    • Faster, more faithful tests.
  • Lighter-weight environments are available as well.
    • Subprocess(es)
    • Loopback
    • Embedded

17 of 33

Cross-language transforms

Portability gives us

  • Language agnostic representation of pipelines
    • Transforms, coders, ...
  • Per-operation specification of environment

18 of 33

Cross-language transforms

Portability gives us

  • Language agnostic representation of pipelines
    • Transforms, coders, ...
  • Per-operation specification of environment

We are no longer bound to a single language/SDK in a given pipeline.

19 of 33

Cross-language transforms

Transforms can be shared among SDKs

  • Rich set of IOs from Java available everywhere
  • Tensorflow TFX transforms in non-Python jobs
  • Leverage SQL work in Python and Go
  • Bootstrap SDKs in other languages

More libraries available in language of your choice.

...

...

20 of 33

Cross-language transforms

  • User constructs pipeline using SDK-native conventions

Client SDK

21 of 33

Cross-language transforms

  • User constructs pipeline using SDK-native conventions
  • An ExternalTransform is applied

Client SDK

MyTransform(param=value)

22 of 33

Cross-language transforms

  • User constructs pipeline using SDK-native conventions
  • An ExternalTransform is applied
  • The transform identifier, with its parameters is sent to an ExpansionService

Client SDK

External SDK

Expansion

Service

MyTransform(param=value)

{param: int:value}

23 of 33

Cross-language transforms

  • The transform is expanded in the external SDK.

External SDK

Client SDK

Expansion

Service

MyTransform(param=value)

MyTransformConfig {

int getParam();

}

{param: int:value}

24 of 33

Cross-language transforms

  • The transform is expanded in the external SDK.
  • The expansion is returned to the client SDK.

External SDK

Client SDK

Expansion

Service

25 of 33

Cross-language transforms

  • The transform is expanded in the external SDK.
  • The expansion is returned to the client SDK…�...and plugged into the graph.

External SDK

Client SDK

Expansion

Service

26 of 33

Cross-language transforms

  • The transform is expanded in the external SDK.
  • The expansion is returned to the client SDK…�...and plugged into the graph.
  • Construction continues as before.

Client SDK

27 of 33

Cross-language transforms

  • Execution happens by interacting with multiple environments.

Client SDK

Runner

Job Service

Worker

Worker

SDK Environment

SDK Environment

SDK Environment

28 of 33

Demo

TEST_COUNT_URN = "pytest:beam:transforms:count"

TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than"

expansion_service = 'localhost:8096'

def pipeline(root):

(root

| ' PYTHON CREATE ' >> beam.Create(list(u'aaabccxyyzzz'), reshuffle=False)

| ' JAVA MAP ' >> beam.ExternalTransform(TEST_FILTER_URN, b'middle', expansion_service)

| ' JAVA COUNT ' >> beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service)

| ' PYTHON MAP ' >> beam.Map(lambda kv: '%s: %s' % kv)

| beam.Map(lambda x: time.sleep(5)))

options = PipelineOptions(

experiments=['beam_fn_api'],

environment_type='LOOPBACK',

flink_master_url="localhost:8081")

FlinkRunner().run(pipeline, options=options)

29 of 33

New Features

Some new features exclusively available on Portability

30 of 33

New Features

Some new features exclusively available via Portability�

  • SplittableDoFn
    • Radically Modular IO Connectors
  • Beam Metrics
    • System metrics, richer user counter types
  • New runners and SDKs
    • E.g. GoLang, Samza runner �
  • Iteractive, Visualization tools

Portability is the future.

31 of 33

Status

  • Portability fully implemented in Python SDK, mostly in Java, nothing but FnAP in Go
  • Full runner implementation in Python UniversalLocalRunner.
  • Flink pioneered implementation, essentially complete
  • Dataflow implementation ongoing, guarded by an experiment flag
  • Batch Spark execution.

32 of 33

Summary

  • The Portability Framework allows us to finally, fully �realize the Vision of Beam
  • Several other exciting features are unlocked as well
    • Hermetic workers, Mutli-language, SplittableDoFns, …
  • New SDKs and Runners are targeting the Portability API

The future is being built on top of Portability

33 of 33

Questions?