Splittable DoFn
ALEX VAN BOXEL
09 – 24 – 2018
Alex Van Boxel
@alexvb
Agenda
Why a new input?
Current Architecture
Quantum�Microservices
Stream - BQ
Stream -
EL
Stream -
SP
Entities�Cloud Pub/Sub
Stream -
BT
Why a new input?
Current Architecture
Quantum�Microservices
Stream - BQ
Stream -
EL
Stream -
SP
Stream -
BT
It all began in ...
Happy
not to mess with Unbounded IO Connector
Learning
Splittable DoFn
Splittable Support
What
is a SDF?
What can’t a DoFn do?
What can a SDF do?
SDF has super powers
Example: File on storage
Example: BigTable
Idea: Infinite storage stream
@
DoFn.BoundedPerElement
@
DoFn.GetInitialRestriction
Define the range
@GetInitialRestriction�public OffsetRange getInitialRestriction(String url) {� Long size = sizeOf(url) � return new OffsetRange(0, size);�}�
@
DoFn.SplitRestriction
First split, you know best
@SplitRestriction�public void splitRestriction(String element, OffsetRange restriction,� OutputReceiver<OffsetRange> ranges) {� long x2 = restriction.getTo();� ranges.output(new OffsetRange(restriction.getFrom(), x2 / 2));� ranges.output(new OffsetRange(x2 / 2, x2));�}�
@
DoFn.NewTracker
Get ready to work
@ProcessElement�public void process(ProcessContext c, OffsetRangeTracker tracker) {� OffsetRange currentRestriction = tracker.currentRestriction();�
…�}
Get ready to work
long pos = currentRestriction.getFrom();� pos = nextValidPosition(pos);
for (; tracker.tryClaim(pos); ) {� // no need to claim every pos� for( block ) {�� c.output(the_work);� }�
Get ready to work
long pos = currentRestriction.getFrom();� pos = nextValidPosition(pos);
for (; tracker.tryClaim(pos); ) {�� for( block ) {� // you can do work for pos past the currentRestriction.getTo()� c.output(the_work);� }�
Bounded
[37,56)
[17,37)
[0,56)
Example code:BoundedOverlapFn.java
@
DoFn.UnboundedPerElement
Define the range
@GetInitialRestriction�public OffsetRange getInitialRestriction(String url) {� Long begin = begin(url) � return new OffsetRange(begin, Long.MAX_VALUE);�}�
DoFn.ProcessContinuation
Unbound flow control
return DoFn.ProcessContinuation.stop()
Unbound flow control
return DoFn.ProcessContinuation.resume()
Unbound flow control
return DoFn.ProcessContinuation.resume()
.withResumeDelay(Duration.millis(100))
Claim/Resume gotcha
Unbounded
[1030,∞)
[1029,∞)
[1028,∞)
[1027,∞)
[1013,∞)
[1000,∞)
Example code:SecondTickSplitFn.java
resume
loop
So what
happened to the Mongo Changestream use case?
Conclusion
Questions & (Answers)
Thank You