Dataflow under the covers
Pablo E (pabloem@apache.org)
What’s this talk?
Quick Agenda
First there was MapReduce…
MapReduce Paper published in 2004
First there was MapReduce…
Lesson 1: The parallelism of your pipeline is defined by your source.
Splittable sources…
Hard-to-split sources…
Fault tolerance!
Lesson 2: “Exactly once” means consistent state despite of retries.
Fault tolerance!
Lesson 2: “Exactly once” means consistent state despite of retries.
In short: WATCH OUT FOR SIDE EFFECTS
idempotent.
Beam IO connectors are designed and tested with care to be idempotent.
A shuffle arms race…
Blog: History of massive-scale sorting experiments at Google (Patent application!)
Lesson 3: Shuffling data is often a bottleneck for many pipelines.
DAGs of MapReduce jobs…
DAGs of MapReduce jobs…
Write to disk (or GFS)
Write to disk (or GFS)
DAGs of MapReduce jobs…
Write to disk (or GFS)
Write to disk (or GFS)
Hand-optimized code
DAGs of MapReduce jobs…
Write to disk (or GFS)
Write to disk (or GFS)
Hand-optimized code
Shuffle step (inside MR jobs)
DAGs of MapReduce jobs…
PCollection<String> urls = FlumeJava.read(
"/gfs/data/crawler/urls.txt");
PTable<URL, DocInfo> backlinks = urls.parallelDo(
new DoFn<...>() { ... },
tableOf(recordsOf(URL.class),
recordsOf(DocInfo.class)));
PTable<...> referringDocInfos = backlinks.groupByKey();
referringDocInfos.parallelDo(...);
FlumeJava: Not just an MR orchestrator
FlumeJava: Not just an MR orchestrator
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply("Collect start time metrics", ParDo.of(runtimeMonitor))
.apply("Transform individual element", ParDo.of(elementTransf))
.apply(“Total bytes monitor", ParDo.of(new ByteMonitor()))
.apply(“Window by minutes”, Window.into(FixedWindows.of(...))
.apply("Group by key", GroupByKey.create())
.apply(“Ungroup”, ParDo.of(new Ungroup(getIterations()))))
.apply("Collect end metrics”, ParDo.of(runtimeMonitor));
Read
ParDo(...)
ParDo(...)
ParDo(...)
FlumeJava: Not just an MR orchestrator
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply("Collect start time metrics", ParDo.of(runtimeMonitor))
.apply("Transform individual element", ParDo.of(elementTransf))
.apply(“Total bytes monitor", ParDo.of(new ByteMonitor()))
.apply(“Window by minutes”, Window.into(FixedWindows.of(...))
.apply("Group by key", GroupByKey.create())
.apply(“Ungroup”, ParDo.of(new Ungroup(getIterations()))))
.apply("Collect end metrics”, ParDo.of(runtimeMonitor));
Read
ParDo(...)
ParDo(...)
ParDo(...)
FlumeJava: Not just an MR orchestrator
Read(BQ)
ParDo(...)
ParDo(...)
Read(S3)
ParDo(...)
Flatten
ParDo(...)
Write(...)
FlumeJava: Not just an MR orchestrator
Read(BQ)
ParDo(...)
ParDo(...)
Read(S3)
ParDo(...)
Flatten
ParDo(...)
Write(...)
Read(BQ)
ParDo(...)
ParDo(...)
Read(S3)
ParDo(...)
ParDo(...)
Write(...)
ParDo(...)
Write(...)
FlumeJava: Not just an MR orchestrator
FlumeJava: Not just an MR orchestrator
FlumeJava: Not just an MR orchestrator
FlumeJava: Not just an MR orchestrator
So what?
FlumeJava: Not just an MR orchestrator
Random Elm ID
COMMIT!!
Write elements
Example: Write To BigQuery
FlumeJava: Not just an MR orchestrator
Note: These are not currently part of Beam nor Dataflow
Lesson 4: Dataflow/Beam will break up your job into ‘stages’. Sometimes it’s good to know where stages start and end.
A parallel workstream…
A parallel workstream…?!
Lesson 5: Dataflow uses different backends in batch vs streaming
WHAT?!
A parallel workstream…?!
Dataflow uses different backends in batch vs streaming
WHAT?!
A parallel workstream…?!
Dataflow uses different backends in batch vs streaming
WHAT?!
A parallel workstream…?!
Dataflow uses different backends in batch vs streaming
WHAT?!
A parallel workstream…?!
Dataflow uses different backends in batch vs streaming
WHAT?!
How can I control my bundle size?!
A parallel workstream…?!
Dataflow uses different backends in batch vs streaming
WHAT?!
How can I control my bundle size?!
A parallel workstream…
Kickass talk on history of streaming at Google
A small team in Seattle…
MillWheel: Fault-Tolerant Stream Processing at Internet Scale
Streaming building blocks…
Lesson 6: On streaming, Dataflow associates every single element to a key.
Lesson 7: To the streaming backend, everything is timers, state and watermarks.
The MillWheel API…
Step 1: Define a ‘topology’ (a DAG!)
The MillWheel API
Step 1: Define a ‘topology’ (a DAG!)
Step 2: Define the Computations
The MillWheel API
Step 1: Define a ‘topology’ (a DAG!)
Step 2: Define the Computations
Step 3: Off you go!
Streaming lessons…
Lesson 8: Processing is serial for each key
Streaming lessons…
Lesson 8: Processing is serial for each key
Lesson 9: In streaming, key-to-key order is guaranteed
Key-to-key order (streaming)
Remember: Every element has a key associated with it behind the covers
(
ReadFromKafka(...)
| WindowInto(GlobalWindow(), trigger=AfterCount(1))
| GroupByKey()
| Map(print)
)
# Kafka PARTITION:
[
(‘user1’, ‘event1’),
(‘user2’, ‘event1’),
(‘user1’, ‘event2’),
(‘user1’, ‘event3’),
(‘user2’, ‘event2’),
]
Key-to-key order (streaming)
Remember: Every element has a key associated with it behind the covers
# Output: IN READ ORDER (per key)
[(‘user1’, ‘event1’),
(‘user1’, ‘event2’),
(‘user1’, ‘event3’)]
[(‘user2’, ‘event1’),
(‘user2’, ‘event2’)]
(
ReadFromKafka(...)
| WindowInto(GlobalWindow(), trigger=AfterCount(1))
| GroupByKey()
| Map(print)
)
# Kafka PARTITION:
[
(‘user1’, ‘event1’),
(‘user2’, ‘event1’),
(‘user1’, ‘event2’),
(‘user1’, ‘event3’),
(‘user2’, ‘event2’),
]
Streaming lessons…
Lesson 8: Processing is serial for each key
Lesson 9: In streaming, key-to-key order is guaranteed
STREAMING ONLY
NOT IN BATCH
Why? The explanation comes from the shuffling algorithms.
Streaming lessons…
Lesson 8: Processing is serial for each key
Lesson 9: Only in streaming, key-to-key order is guaranteed
STREAMING ONLY
NOT IN BATCH
Why? The explanation comes from the shuffling algorithms.
In fact, Windmill has these two axioms as part of its architectural principles:
Axiom 1: All processing in windmill is in the context of a key
Axiom 2: Message processing in windmill is serial for each key
Streaming lessons…
Lesson 10: Dataflow processes timers for each key in order
Streaming lessons…
In-depth Blog: Streaming Engine: Execution Model for Highly-Scalable, Low-Latency Data Processing
Next: Autosharding is a new streaming feature!
Batch and streaming unification…
Two separate teams:
“How do we support Streaming in Flume?”
Batch and streaming unification…
“Windowing was the key insight that allowed us to unify batch and streaming.
Millwheel didn't have the notion of group-by-key and to naively apply it would result in an iterable that never ended.”
Batch and streaming unification…
“First we added windows, and triggers followed soon after that”
Note how “state and timers” disappeared from the user-facing API.
The return of the state [... and timers]
“We knew the windowing model wasn't as powerful as the ‘roll your own[..] state and timers’ - even if in MillWheel the majority of state+timer use was aggregation over a window”
… enough about concepts and APIs…
The (initial) Dataflow service architecture
‘Backend’
/
Dataflow Manager
Java Worker
Java Worker
Java Worker
Java Worker
Dataflow API
The (initial) Dataflow service architecture
‘Backend’
/
Dataflow Manager
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Dataflow API
The (initial) Dataflow service architecture
‘Backend’
/
Dataflow Manager
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Java / Python Worker
Shuffler
C bindings
Java / Python Worker
Windmill Appliance
C bindings
Batch
Streaming
Dataflow API
Dataflow API
The (initial) Dataflow service architecture
‘Backend’
/
Dataflow Manager
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Java / Python Worker
Windmill Appliance
C bindings
Streaming
Dataflow API
Java / Python Worker
Shuffler
C bindings
Batch
Dataflow API
Java / Python Worker
Shuffler
C bindings
Batch
Dataflow API
Java / Python Worker
Shuffler
C bindings
Batch
Dataflow API
Java / Python Worker
Shuffler
C bindings
Batch
Dataflow API
The (initial) Dataflow service architecture
‘Backend’
/
Dataflow Manager
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Java / Python Worker
Windmill Appliance
C bindings
Streaming
Dataflow API
Java / Python Worker
Shuffler
C bindings
Batch
Dataflow API
Java / Python Worker
Windmill Appliance
C bindings
Streaming
Dataflow API
Java / Python Worker
Windmill Appliance
C bindings
Streaming
Dataflow API
Java / Python Worker
Windmill Appliance
C bindings
Streaming
Dataflow API
The initial Dataflow worker architecture
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Dataflow API
The initial Dataflow worker architecture
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Dataflow API
How about Flink? Spark? Samza?
The initial Dataflow worker architecture
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Dataflow API
How about Flink? Spark? Samza?
How about Go? Typescript? Other langs?
The current Dataflow worker architecture
Java Worker
Java Worker
Java Worker
Java Worker
Python Worker
Dataflow API
Dataflow API
Beam Portability Framework!
The current Dataflow worker architecture
“SDK Worker”
Dataflow Runner Harness
C++
Dataflow API
Beam
Portability API
GCE VM
The current Dataflow service architecture
SDK Worker
(Py/Java/Go/JS)
‘Backend’
/
Dataflow Manager
Runner Harness
C++
(Control path)
Dataflow API
Shuffle Service
Streaming Engine
(Windmill)
(Data path)
Windmill / Shuffle API
In batch:
In streaming:
GCE VM
The current Dataflow service architecture
GCE VM
SDK Worker
(Py/Java/Go/JS)
‘Backend’
/
Dataflow Manager
Runner Harness
C++
(Control path)
Dataflow API
Shuffle Service
Streaming Engine
(Windmill)
(Data path)
Windmill / Shuffle API
In batch:
In streaming:
Won’t there be more latency?
In summary
Lesson 1: The parallelism of your pipeline is defined by your source.
Lesson 2: “Exactly once” means consistent state despite of retries.
Lesson 3: Shuffling data is often a bottleneck for many pipelines.
Lesson 4: Dataflow/Beam will break up your job into ‘stages’.
Lesson 5: Dataflow uses different backends in batch vs streaming.
Lesson 6: On streaming, Dataflow associates every single element to a key.
Lesson 7: To the streaming backend, everything is timers, state and watermarks.
Lesson 8: Processing is serial for each key. Number of keys is the maximum parallelism in a pipeline.
Lesson 9: ONLY in streaming, key-to-key order is guaranteed.
Lesson 10: Dataflow processes timers for each key in order. Windows also fire in order PER KEY.
Links and resources
My conclusion…
Graveyard
In Dataflow, the total amount of parallelism in any given stage is given by the following:
Notes on Reuven chat
Early ads systems (SmartASS / Evenflow / AdSpam) - competitive advantage
Streaming MapReduce - overlapped with building MillWheel at the same time
Notes on Sam talk (Beam-MillWheel)
All processing is in the context of a key always. If a user key is not there, then Windmill has an implicit key (e.g. the source’s split)
Notes on Streaming Semantics (Sam & Slava talks)
Dynamic splitting is on the same key in streaming
Ranges are hash ranges [0, 12345); [12345, DEF01), … FFFF] (int64 hashes)
Notes on autoscaling talk
Autoscaling metrics