1 of 75

Dataflow under the covers

Pablo E (pabloem@apache.org)

2 of 75

What’s this talk?

  • A compilation of a few stories…
  • A bunch of implementation details about Beam
  • A bunch of implementation details about Dataflow
  • A list of interesting resources to learn about it

3 of 75

Quick Agenda

  1. MapReduce - Batch, Reading in parallel, Group by key
  2. FlumeJava - Batch, Optimizations
  3. MillWheel - Streaming, Watermarks, Parallelism
  4. Dataflow - Putting it all together
  5. List of high-level lessons
  6. List of kickass resources

4 of 75

First there was MapReduce…

MapReduce Paper published in 2004

  • Fault tolerance
  • Reading large log files from GFS
  • Applying map - shuffle - reduce transformations
  • Glossed over in the paper
    • Reading data from different sources?
    • Technical details of the distributed shuffle

5 of 75

First there was MapReduce…

Lesson 1: The parallelism of your pipeline is defined by your source.

6 of 75

Splittable sources…

  • BigTable / Cassandra / Dynamo
  • Kafka / Pubsub
  • Avro Files / Parquet Files / JSON-lines

Hard-to-split sources…

  • Traditional databases (JDBC)
  • HTTP endpoints
  • Compressed files, some CSV
  • others…

7 of 75

Fault tolerance!

Lesson 2: “Exactly once” means consistent state despite of retries.

  • Your code will often run more than once.
  • Dataflow guarantees that all elements will be successfully processed once, and that intermediate state will not be duplicated.

8 of 75

Fault tolerance!

Lesson 2: “Exactly once” means consistent state despite of retries.

  • Your code will often run more than once.
  • Dataflow guarantees that all elements will be successfully processed once, and that intermediate state will not be duplicated.

In short: WATCH OUT FOR SIDE EFFECTS

  • Code with side effects should be

idempotent.

Beam IO connectors are designed and tested with care to be idempotent.

9 of 75

A shuffle arms race…

Blog: History of massive-scale sorting experiments at Google (Patent application!)

  • The team worked on performance of massive shuffles.

Lesson 3: Shuffling data is often a bottleneck for many pipelines.

  • If you can do something without a GroupByKey, then avoid the GBK.

10 of 75

DAGs of MapReduce jobs…

11 of 75

DAGs of MapReduce jobs…

Write to disk (or GFS)

Write to disk (or GFS)

12 of 75

DAGs of MapReduce jobs…

Write to disk (or GFS)

Write to disk (or GFS)

Hand-optimized code

13 of 75

DAGs of MapReduce jobs…

Write to disk (or GFS)

Write to disk (or GFS)

Hand-optimized code

Shuffle step (inside MR jobs)

14 of 75

DAGs of MapReduce jobs…

PCollection<String> urls = FlumeJava.read(

"/gfs/data/crawler/urls.txt");

PTable<URL, DocInfo> backlinks = urls.parallelDo(

new DoFn<...>() { ... },

tableOf(recordsOf(URL.class),

recordsOf(DocInfo.class)));

PTable<...> referringDocInfos = backlinks.groupByKey();

referringDocInfos.parallelDo(...);

15 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:

16 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators

PCollection<KV<byte[], byte[]>> input =

pipeline

.apply("Read input", readFromSource(sourceOptions))

.apply("Collect start time metrics", ParDo.of(runtimeMonitor))

.apply("Transform individual element", ParDo.of(elementTransf))

.apply(“Total bytes monitor", ParDo.of(new ByteMonitor()))

.apply(“Window by minutes”, Window.into(FixedWindows.of(...))

.apply("Group by key", GroupByKey.create())

.apply(“Ungroup”, ParDo.of(new Ungroup(getIterations()))))

.apply("Collect end metrics”, ParDo.of(runtimeMonitor));

Read

ParDo(...)

ParDo(...)

ParDo(...)

17 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators

PCollection<KV<byte[], byte[]>> input =

pipeline

.apply("Read input", readFromSource(sourceOptions))

.apply("Collect start time metrics", ParDo.of(runtimeMonitor))

.apply("Transform individual element", ParDo.of(elementTransf))

.apply(“Total bytes monitor", ParDo.of(new ByteMonitor()))

.apply(“Window by minutes”, Window.into(FixedWindows.of(...))

.apply("Group by key", GroupByKey.create())

.apply(“Ungroup”, ParDo.of(new Ungroup(getIterations()))))

.apply("Collect end metrics”, ParDo.of(runtimeMonitor));

Read

ParDo(...)

ParDo(...)

ParDo(...)

18 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking

Read(BQ)

ParDo(...)

ParDo(...)

Read(S3)

ParDo(...)

Flatten

ParDo(...)

Write(...)

19 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking

Read(BQ)

ParDo(...)

ParDo(...)

Read(S3)

ParDo(...)

Flatten

ParDo(...)

Write(...)

Read(BQ)

ParDo(...)

ParDo(...)

Read(S3)

ParDo(...)

ParDo(...)

Write(...)

ParDo(...)

Write(...)

20 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting

21 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting

22 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting

23 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting

So what?

24 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting

Random Elm ID

COMMIT!!

Write elements

Example: Write To BigQuery

25 of 75

FlumeJava: Not just an MR orchestrator

  • More information about existing operators.
  • Some optimizations:
    • Fusion of operators
    • Flatten sinking
    • Combiner lifting
  • Possible optimizations:
    • Predicate pushdown / filter sinking
    • Select pushdown / sinking
    • Runner-side mapping

Note: These are not currently part of Beam nor Dataflow

Lesson 4: Dataflow/Beam will break up your job into ‘stages’. Sometimes it’s good to know where stages start and end.

26 of 75

A parallel workstream

  • Many streaming efforts at Google…
  • A small team in Seattle worked on MillWheel…

27 of 75

A parallel workstream…?!

Lesson 5: Dataflow uses different backends in batch vs streaming

WHAT?!

28 of 75

A parallel workstream…?!

Dataflow uses different backends in batch vs streaming

WHAT?!

  • The job manager is the same for both (the “control path”)
  • The “data path” is different
    • Batch: optimized to perform very large shuffles efficiently
    • Streaming: optimized to perform low-latency exactly-once streaming shuffles

29 of 75

A parallel workstream…?!

Dataflow uses different backends in batch vs streaming

WHAT?!

  • The job manager is the same for both (the “control path”)
  • The “data path” is different
    • Batch: optimized to perform very large shuffles efficiently
      • Bundles are usually large (100s - 1000s of elements!)
    • Streaming: optimized to perform low-latency exactly-once streaming shuffle

30 of 75

A parallel workstream…?!

Dataflow uses different backends in batch vs streaming

WHAT?!

  • The job manager is the same for both (the “control path”)
  • The “data path” is different
    • Batch: optimized to perform very large shuffles efficiently
      • Bundles are usually large (100s - 1000s of elements!)
    • Streaming: optimized to perform low-latency exactly-once streaming shuffle
      • Bundles are usually small
        • When the pipeline is up-to-date, bundles are <10 elements!
        • When the pipeline is processing backlog, bundles are ~1000s elements.

31 of 75

A parallel workstream…?!

Dataflow uses different backends in batch vs streaming

WHAT?!

  • The job manager is the same for both (the “control path”)
  • The “data path” is different
    • Batch: optimized to perform very large shuffles efficiently
      • Bundles are usually large (100s - 1000s of elements!)
    • Streaming: optimized to perform low-latency exactly-once streaming shuffle
      • Bundles are usually small
        • When the pipeline is up-to-date, bundles are <10 elements!
        • When the pipeline is processing backlog, bundles are ~1000s elements.

How can I control my bundle size?!

32 of 75

A parallel workstream…?!

Dataflow uses different backends in batch vs streaming

WHAT?!

  • The job manager is the same for both (the “control path”)
  • The “data path” is different
    • Batch: optimized to perform very large shuffles efficiently
      • Bundles are usually large (100s - 1000s of elements!)
    • Streaming: optimized to perform low-latency exactly-once streaming shuffle
      • Bundles are usually small
        • When the pipeline is up-to-date, bundles are <10 elements!
        • When the pipeline is processing backlog, bundles are ~1000s elements.

How can I control my bundle size?!

33 of 75

A parallel workstream

  • Many streaming efforts at Google…
    • Custom systems
    • Join search queries with ad clicks
  • A small team in Seattle worked on MillWheel…

Kickass talk on history of streaming at Google

  • How did we define watermarks?
    • (How much did we iterate on the concept?!)

34 of 75

A small team in Seattle…

MillWheel: Fault-Tolerant Stream Processing at Internet Scale

  • Watermarks
  • Timers
  • State
  • Per-key processing
  • Exactly once

35 of 75

Streaming building blocks…

Lesson 6: On streaming, Dataflow associates every single element to a key.

  • What if they are not KV<...> pairs?
    • Internally, the element belongs to an ‘arbitrary’ key’.
    • Usually, this key is related to a split on your source.
  • Also, every bundle contains elements for a single key.
  • Windows are calculated per key

Lesson 7: To the streaming backend, everything is timers, state and watermarks.

  • No triggers or windows*

36 of 75

The MillWheel API…

Step 1: Define a ‘topology’ (a DAG!)

37 of 75

The MillWheel API

Step 1: Define a ‘topology’ (a DAG!)

Step 2: Define the Computations

38 of 75

The MillWheel API

Step 1: Define a ‘topology’ (a DAG!)

Step 2: Define the Computations

Step 3: Off you go!

39 of 75

Streaming lessons…

  • Every element is associated with a key

Lesson 8: Processing is serial for each key

  • # of keys is the maximum parallelism in a pipeline.

40 of 75

Streaming lessons…

  • Every element is associated with a key

Lesson 8: Processing is serial for each key

  • # of keys is the maximum parallelism in a pipeline.

Lesson 9: In streaming, key-to-key order is guaranteed

  • WHAT?!

41 of 75

Key-to-key order (streaming)

Remember: Every element has a key associated with it behind the covers

(

ReadFromKafka(...)

| WindowInto(GlobalWindow(), trigger=AfterCount(1))

| GroupByKey()

| Map(print)

)

# Kafka PARTITION:

[

(‘user1’, ‘event1’),

(‘user2’, ‘event1’),

(‘user1’, ‘event2’),

(‘user1’, ‘event3’),

(‘user2’, ‘event2’),

]

42 of 75

Key-to-key order (streaming)

Remember: Every element has a key associated with it behind the covers

# Output: IN READ ORDER (per key)

[(‘user1’, ‘event1’),

(‘user1’, ‘event2’),

(‘user1’, ‘event3’)]

[(‘user2’, ‘event1’),

(‘user2’, ‘event2’)]

(

ReadFromKafka(...)

| WindowInto(GlobalWindow(), trigger=AfterCount(1))

| GroupByKey()

| Map(print)

)

# Kafka PARTITION:

[

(‘user1’, ‘event1’),

(‘user2’, ‘event1’),

(‘user1’, ‘event2’),

(‘user1’, ‘event3’),

(‘user2’, ‘event2’),

]

43 of 75

Streaming lessons…

  • Every element is associated with a key

Lesson 8: Processing is serial for each key

  • # of keys is the maximum parallelism in a pipeline.

Lesson 9: In streaming, key-to-key order is guaranteed

  • WHAT?!

STREAMING ONLY

NOT IN BATCH

Why? The explanation comes from the shuffling algorithms.

  • In streaming, we want fast, consistent, exactly-once, low-latency shuffles.
  • In batch, we want to shuffle VERY large amounts of data: We re-sort the data as quickly as possible to group by key. Arrival order does not matter as we sort the data again.

44 of 75

Streaming lessons…

  • Every element is associated with a key

Lesson 8: Processing is serial for each key

  • # of keys is the maximum parallelism in a pipeline.

Lesson 9: Only in streaming, key-to-key order is guaranteed

  • WHAT?!

STREAMING ONLY

NOT IN BATCH

Why? The explanation comes from the shuffling algorithms.

  • In streaming, we want fast, consistent, exactly-once, low-latency shuffles.
  • In batch, we want to shuffle VERY large amounts of data: We re-sort the data as quickly as possible to group by key. Arrival order does not matter as we sort the data again.

In fact, Windmill has these two axioms as part of its architectural principles:

Axiom 1: All processing in windmill is in the context of a key

Axiom 2: Message processing in windmill is serial for each key

45 of 75

Streaming lessons…

Lesson 10: Dataflow processes timers for each key in order

  • Therefore: Windows also fire in order*
    • *PER KEY

46 of 75

Streaming lessons…

In-depth Blog: Streaming Engine: Execution Model for Highly-Scalable, Low-Latency Data Processing

Next: Autosharding is a new streaming feature!

  • Before Autosharding:
    • Either pick a fixed number of shards to write to your sink OR
    • Lose control over your bundle size (small / huge bundles allowed)
  • Dataflow innovation: Let’s vary the number of shards according to the number of workers, and control bundle sizes.
    • How to try: GroupIntoBatches.withAutoSharding()

47 of 75

Batch and streaming unification…

Two separate teams:

  • MillWheel team
  • MapReduce / Flume team

“How do we support Streaming in Flume?”

48 of 75

Batch and streaming unification…

Windowing was the key insight that allowed us to unify batch and streaming.

Millwheel didn't have the notion of group-by-key and to naively apply it would result in an iterable that never ended.”

49 of 75

Batch and streaming unification…

  • GroupByKey is the way elements are passed between stages
    • Batch: Shuffle a massive dataset.
    • Streaming: Perform low-latency exactly-once routing of data.

“First we added windows, and triggers followed soon after that”

Note how “state and timers” disappeared from the user-facing API.

50 of 75

The return of the state [... and timers]

“We knew the windowing model wasn't as powerful as the ‘roll your own[..] state and timers’ - even if in MillWheel the majority of state+timer use was aggregation over a window”

  • State: re-designed slightly to work in batch and streaming.
    • State is partitioned per key and per window
    • Stateful DoFn (i.e. a DoFn with StateSpecs)
      • Batch: State is cached in memory
      • Streaming: State is persistent in the streaming backend

51 of 75

… enough about concepts and APIs…

52 of 75

The (initial) Dataflow service architecture

‘Backend’

/

Dataflow Manager

Java Worker

Java Worker

Java Worker

Java Worker

Dataflow API

53 of 75

The (initial) Dataflow service architecture

‘Backend’

/

Dataflow Manager

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Dataflow API

54 of 75

The (initial) Dataflow service architecture

‘Backend’

/

Dataflow Manager

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Java / Python Worker

Shuffler

C bindings

Java / Python Worker

Windmill Appliance

C bindings

Batch

Streaming

Dataflow API

Dataflow API

55 of 75

The (initial) Dataflow service architecture

‘Backend’

/

Dataflow Manager

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Java / Python Worker

Windmill Appliance

C bindings

Streaming

Dataflow API

Java / Python Worker

Shuffler

C bindings

Batch

Dataflow API

Java / Python Worker

Shuffler

C bindings

Batch

Dataflow API

Java / Python Worker

Shuffler

C bindings

Batch

Dataflow API

Java / Python Worker

Shuffler

C bindings

Batch

Dataflow API

56 of 75

The (initial) Dataflow service architecture

‘Backend’

/

Dataflow Manager

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Java / Python Worker

Windmill Appliance

C bindings

Streaming

Dataflow API

Java / Python Worker

Shuffler

C bindings

Batch

Dataflow API

Java / Python Worker

Windmill Appliance

C bindings

Streaming

Dataflow API

Java / Python Worker

Windmill Appliance

C bindings

Streaming

Dataflow API

Java / Python Worker

Windmill Appliance

C bindings

Streaming

Dataflow API

57 of 75

The initial Dataflow worker architecture

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Dataflow API

58 of 75

The initial Dataflow worker architecture

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Dataflow API

How about Flink? Spark? Samza?

  • A whole new Python worker that speaks “Flink API”?
  • A whole new Java worker that speaks “Flink API”?
  • A whole new Python worker that speaks “Spark API”?
  • A whole new Java worker that speaks “Spark API”?
  • A whole new Python worker that speaks “Samza API”?
  • A whole new Java worker that speaks “Samza API”?

59 of 75

The initial Dataflow worker architecture

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Dataflow API

How about Flink? Spark? Samza?

  • A whole new Python worker that speaks “Flink API”?
  • A whole new Java worker that speaks “Flink API”?
  • A whole new Python worker that speaks “Spark API”?
  • A whole new Java worker that speaks “Spark API”?
  • A whole new Python worker that speaks “Samza API”?
  • A whole new Java worker that speaks “Samza API”?

How about Go? Typescript? Other langs?

60 of 75

The current Dataflow worker architecture

Java Worker

Java Worker

Java Worker

Java Worker

Python Worker

Dataflow API

Dataflow API

Beam Portability Framework!

61 of 75

The current Dataflow worker architecture

“SDK Worker”

  • Python SDK worker
  • Java SDK worker
  • Go SDK worker
  • ??? SDK worker

Dataflow Runner Harness

C++

Dataflow API

Beam

Portability API

GCE VM

62 of 75

The current Dataflow service architecture

SDK Worker

(Py/Java/Go/JS)

‘Backend’

/

Dataflow Manager

Runner Harness

C++

(Control path)

Dataflow API

Shuffle Service

Streaming Engine

(Windmill)

(Data path)

Windmill / Shuffle API

In batch:

In streaming:

GCE VM

63 of 75

The current Dataflow service architecture

GCE VM

SDK Worker

(Py/Java/Go/JS)

‘Backend’

/

Dataflow Manager

Runner Harness

C++

(Control path)

Dataflow API

Shuffle Service

Streaming Engine

(Windmill)

(Data path)

Windmill / Shuffle API

In batch:

In streaming:

Won’t there be more latency?

  • Yes, though there’s many tradeoffs.

64 of 75

In summary

Lesson 1: The parallelism of your pipeline is defined by your source.

Lesson 2: “Exactly once” means consistent state despite of retries.

Lesson 3: Shuffling data is often a bottleneck for many pipelines.

Lesson 4: Dataflow/Beam will break up your job into ‘stages’.

Lesson 5: Dataflow uses different backends in batch vs streaming.

Lesson 6: On streaming, Dataflow associates every single element to a key.

Lesson 7: To the streaming backend, everything is timers, state and watermarks.

Lesson 8: Processing is serial for each key. Number of keys is the maximum parallelism in a pipeline.

Lesson 9: ONLY in streaming, key-to-key order is guaranteed.

Lesson 10: Dataflow processes timers for each key in order. Windows also fire in order PER KEY.

65 of 75

Links and resources

66 of 75

My conclusion…

  • Dataflow is a complex system
  • Beam is a cool abstraction - but it can leak implementation details

  • Questions?
  • We monitor StackOverflow
  • Strike conversations with some of the Google attendees

67 of 75

68 of 75

Graveyard

69 of 75

In Dataflow, the total amount of parallelism in any given stage is given by the following:

  • The number of splits of your source - downstream from your source, and before any GroupByKey, Combine, or Stateful operation
  • The total number of keys - after any GroupByKey, Combine or Stateful operation

70 of 75

Notes on Reuven chat

Early ads systems (SmartASS / Evenflow / AdSpam) - competitive advantage

Streaming MapReduce - overlapped with building MillWheel at the same time

  • MillWheel was funded by Greg Cjaikowski in Seattle a while ago - unrelated to MapReduce
    • Elements (k, v, ts) triplets; persistent state and timers; watermarks (previously cursors) for event-time; exactly-once
    • Watermarks were not good back then (statistical models and stuff)
  • Tyler and Robert worked together to do streaming flume (JNI Millwheel - FlumeJava)
    • Streaming Flume introduced windows - windows were the key insight
      • MapReduce / Flume worked with GBK
    • Triggers appeared later, but inspired by these windowing things
  • Dataflow
    • (MillWheel was a class-based model - Windmill was a service with proto APIs)
    • State and timers were introduced here again

71 of 75

Notes on Sam talk (Beam-MillWheel)

All processing is in the context of a key always. If a user key is not there, then Windmill has an implicit key (e.g. the source’s split)

  • Dataflow provides Update, Drain, Snapshot
  • Windmill does hash-sharding - Millwheel did range-sharding, so range-hotspots could emerge
    • The initial key is determined by your source’s split
  • State associated with a streaming GBK
    • Window Set (closed / active windows)
    • Trigger state (element counts,
    • Buffered elements
    • Watermark holds
    • Event and processing timers
    • Note: No cross-key communication

72 of 75

Notes on Streaming Semantics (Sam & Slava talks)

Dynamic splitting is on the same key in streaming

  • Meaning that parallelism is determined by the initial splits!
  • Axiom 1: All processing in windmill is in the context of a key
  • Axiom 2: Message processing in windmill is serial for each key
    • Note: a Key is either a user Key, or a source split.
    • Example of a failure mode:
      • SDK / Beam claims a key and loses communication with windmill (?) - thus loses a lease
    • Verify: Can windows be fired out of order?

73 of 75

Ranges are hash ranges [0, 12345); [12345, DEF01), … FFFF] (int64 hashes)

74 of 75

Notes on autoscaling talk

  • Algorithm invoked every 10 seconds
  • Stabilization window - no scaling changes during these
    • Initial window (0-4 minute of execution)
    • Input signal stabilization window (T(worker changed) -> +4 minutes)
    • Output signal stabilization window (current time minus 4 min - now) (? always open)
  • Public: Backlog and CPU utilization for scaling
  • In fact, we use 25 (int64) metrics to decide how many workers.
  • Compute stage sizes - how many workers required for each stage
  • Compute pool sizes - computes the size of each pool and applies caps (?)
  • Smooth output signal - compute final recommendation as a function of previous recommendations and the current number of workers

75 of 75

Autoscaling metrics

  • For each stage, the algorithm computes a StageModel: upscale recommendation / downscale recommendation
  • Upscale recommendation is a sum
    • Workers needed to keep up with the input
    • Workers needed to consume the existing backlog within configurable time
      • Backlogs: Input backlog, timer backlog, watermark tracking backlog
    • Workers to consume the ‘penalty’ backlog - scaling slows down workers (assumed: 150 seconds) — Q: What
    • Depends on CPU utilization and Backlog
  • Downscaling recommendation
    • Current number of workers * CPU utilization / target CPU utilization
    • Depends only on CPU utilization