1 of 39

Splittable DoFn

ALEX VAN BOXEL

09 – 24 – 2018

2 of 39

Alex Van Boxel

@alexvb

  • Day job: Big Data Architect at Vente-Exclusive.com. Part of the Vente-Privee Group. Specialised in everything Google
  • Nightlife: Having fun reading documentation and stalking Google Product Managers as a Google Cloud Expert
  • Needed an excuse to use this slide in the deck because it’s so cool

3 of 39

Agenda

  • Who needs agenda’s. It’s always full of meetings...

4 of 39

Why a new input?

Current Architecture

QuantumMicroservices

Stream - BQ

Stream -

EL

Stream -

SP

EntitiesCloud Pub/Sub

Stream -

BT

5 of 39

Why a new input?

Current Architecture

QuantumMicroservices

Stream - BQ

Stream -

EL

Stream -

SP

Stream -

BT

6 of 39

It all began in ...

7 of 39

Happy

not to mess with Unbounded IO Connector

8 of 39

Learning

Splittable DoFn

9 of 39

Documentation

  • Apache Beam Blog
  • Design Doc
  • Java Doc

10 of 39

Splittable Support

11 of 39

What

is a SDF?

12 of 39

What can’t a DoFn do?

  • Work inside a DoFn can’t be parallelised further
  • Can’t produce infinite output

13 of 39

What can a SDF do?

  • Split work in parts though restrictions (yes: parallelize)
  • Ways of checkpointing, resuming
  • Infinite output

14 of 39

SDF has super powers

  • It takes an input element
  • Is composable

15 of 39

Example: File on storage

  • Input: storage URL
  • Restriction: [0,object.size)

16 of 39

Example: BigTable

  • Input: table
  • Restriction: [key.start,key.end)

17 of 39

Idea: Infinite storage stream

  • SDF1u: watch a storage bucket for change
  • SDF2b: object url as input, read the object

18 of 39

@

DoFn.BoundedPerElement

19 of 39

@

DoFn.GetInitialRestriction

  • Use this to set the range for the given input element
  • OffsetRange is a good general starting point

20 of 39

Define the range

@GetInitialRestriction�public OffsetRange getInitialRestriction(String url) {� Long size = sizeOf(url) � return new OffsetRange(0, size);�}�

21 of 39

@

DoFn.SplitRestriction

  • Optional: Only called once
  • After that it’s up to the tracker

22 of 39

First split, you know best

@SplitRestriction�public void splitRestriction(String element, OffsetRange restriction,� OutputReceiver<OffsetRange> ranges) {� long x2 = restriction.getTo();� ranges.output(new OffsetRange(restriction.getFrom(), x2 / 2));� ranges.output(new OffsetRange(x2 / 2, x2));�}�

23 of 39

@

DoFn.NewTracker

  • Optional, ranges can have a default tracker

24 of 39

Get ready to work

@ProcessElement�public void process(ProcessContext c, OffsetRangeTracker tracker) {� OffsetRange currentRestriction = tracker.currentRestriction();�

…�}

25 of 39

Get ready to work

long pos = currentRestriction.getFrom();� pos = nextValidPosition(pos);

for (; tracker.tryClaim(pos); ) {� // no need to claim every pos� for( block ) {�� c.output(the_work);� }�

26 of 39

Get ready to work

long pos = currentRestriction.getFrom();� pos = nextValidPosition(pos);

for (; tracker.tryClaim(pos); ) {�� for( block ) {� // you can do work for pos past the currentRestriction.getTo()� c.output(the_work);� }�

27 of 39

Bounded

[37,56)

[17,37)

[0,56)

28 of 39

@

DoFn.UnboundedPerElement

29 of 39

Define the range

@GetInitialRestriction�public OffsetRange getInitialRestriction(String url) {� Long begin = begin(url) � return new OffsetRange(begin, Long.MAX_VALUE);�}�

30 of 39

DoFn.ProcessContinuation

  • Unbounded work needs more flow control

31 of 39

Unbound flow control

return DoFn.ProcessContinuation.stop()

32 of 39

Unbound flow control

return DoFn.ProcessContinuation.resume()

33 of 39

Unbound flow control

return DoFn.ProcessContinuation.resume()

.withResumeDelay(Duration.millis(100))

34 of 39

Claim/Resume gotcha

  • Resume after claim -> position++
  • Resume without claim -> position = position

35 of 39

Unbounded

[1030,∞)

[1029,∞)

[1028,∞)

[1027,∞)

[1013,∞)

[1000,∞)

resume

loop

36 of 39

So what

happened to the Mongo Changestream use case?

37 of 39

Conclusion

  • Remember… composable, your imagination is the limit
  • How do I split up my work? OffsetRange is a good starter
  • Play: github.com/alexvanboxel/beam-demo/

38 of 39

Questions & (Answers)

  • I prefer blue police boxes… When is Doctor Who starting again?

39 of 39

Thank You