Apache Beam Fn API: Progress Reporting

Apache Beam Fn API[a]

Progress Reporting

https://s.apache.org/beam-fn-api-progress-reporting

Vikas Kedigehalli(vikasrk@google.com)

Lukasz Cwik (lcwik@google.com)

Overview

See the Apache Beam Fn API Overview for an introduction into the purpose, a high level overview and a listing of related documents. This document proposes a way for the SDK harness to report progress of a bundle to the Runner over the Fn API.

Progress Reporting

SDK Implementation Considerations

Options

No Progress Reporting

Minimal Progress Reporting

Advanced Progress Reporting

Buffering and Parallel Processing

Runner Implementation Considerations

Backlog Computation

Handling Multiple Inputs

Watermarks

Proto Definitions

Appendix

Alternative Considered

Call Flow Diagram

Latex Equations

Progress Reporting

At a high level, a Runner sends a progress request to the SDK periodically, to which the SDK sends a progress response, both request and response going over the control plane. Since the response is asynchronous, a Runner may chose to block or use previously computed results.

The SDK provides progress information for each PTransform within a currently active bundle. The per transform progress information will contain:

  • Total time spent[b][c] :- amount of time spent so far in this PTransform processing elements (not including emitting elements downstream).
  • Input elements consumed :- number of elements fully processed by this PTransform for each input.
  • Output elements produced :- the number of elements produced so far for each of this PTransform’s outputs, from fully processed input elements.
  • Active input elements :- number of input elements that are actively (started but not fully processed) being processed by this PTransfrom.
  • Output elements produced for active elements :- the number of elements produced so far for each of this PTransform’s outputs, from active input elements.
  • Estimated remaining output for active elements:- the number of elements estimated to be produced per output, from unclaimed work of active input elements.
  • Tentative Watermark :- tracks the tentative output watermark of each PTransform output.

By reporting such granular progress, a Runner gets visibility into the properties of each transform, while being able to use this information to compute the bundle level progress.  

SDK Implementation Considerations

Options

A SDK has a few options on what level of progress it wants to report:

No Progress Reporting

Although recommended, progress reporting is optional. The SDK should return a response indicating that the progress reporting is not supported:

  • for all cases.
  • for a specific process bundle descriptor.
  • for a specific bundle.

The Runner needs to decide on how to handle these error responses appropriately.

The Runner will treat the output watermark as positive infinity for all transforms contained within this bundle.

Minimal Progress Reporting

If a SDK supports progress reporting, it is recommended to report the output watermark of the root SDFs; element counts of all PTransform outputs. This is mainly useful for providing visibility into the progress being made, while keeping the SDK implementation simple. However, it does not provide an accurate estimate of the backlog of work remaining, thus limiting the Runner’s ability to perform dynamic work rebalancing efficiently.

Consider a SDK executing a bundle, as in Figure 1. The gRPC read reports that it has output 4 elements, out of which 2 have been fully processed by the SDF. The other two are being actively processed by the SDF, with an estimated remaining work of 10 elements. Note that even though SDF has produced 50 elements, the following DoFn has only processed 40, and 1 still active. The other 9 haven’t been seen by the DoFn yet, this means that they are likely in an intermediate buffer depending on how the SDK executes the graph. All of this would be reported in response to a single progress request.

Advanced Progress Reporting

A SDK may report more advanced forms of progress, in which the total time spent in each transform is reported. This allows a Runner to estimate the amount of work remaining and perform dynamic work rebalancing efficiently.

 

This requires the SDK to keep track of element processing times for each transform, which can be expensive, so it is recommended to use sampling approaches to compute reasonable estimates, for performance reasons.

Consider the SDK executing the same bundle but now reporting total time spent for each transform, in Figure2. Total time spent reported for the SDF is around 19s, which is inclusive of the time spent in output produced so far[d][e] from active elements.

Buffering and Parallel Processing

The reason for splitting the information into fully processed elements and active elements is that it gives the SDK the flexibility in how it executes a fused sub-graph. SDKs could choose to perform optimizations by buffering elements between transforms and/or executing these transforms in parallel, as long as the progress of the elements in flight is reported correctly under the active elements. For watermarks, it will be the minimum of the watermarks tracked for each element in flight, for each PTransform output.

Runner Implementation Considerations

Backlog Computation[f]

Bundle backlog is defined as the amount of time estimated to process all the known unclaimed work for this bundle.

A Runner needs to implement the following functions to compute bundle backlog:

An estimate of the fraction of active input elements consumed[g][h]:

Screen Shot 2017-08-21 at 12.36.50 AM.png[i][j][k][l][m][n][o][p][q][r]

Average time taken to process an input element:

Screen Shot 2017-08-21 at 12.49.23 AM.png

Average output count per input element:Screen Shot 2017-08-21 at 12.56.42 AM.png

The number of unprocessed elements that have been produced by the parent transform but not yet started processing:

Screen Shot 2017-08-21 at 6.56.14 AM.png

The estimated total number of fully unprocessed input elements that need to be processed by a transform for all the remaining work for a bundle:

Screen Shot 2017-08-21 at 6.58.08 AM.png

The estimated total number of output elements to be produced for all the remaining work in a bundle:

Screen Shot 2017-08-21 at 7.13.05 AM.png

The backlog for a bundle in seconds:

Screen Shot 2017-08-21 at 7.19.31 AM.png

Handling Multiple Inputs

The total time spent reported is inclusive of all inputs in a multiple input transform like Flatten or DoFn with Timers. When computing the average processing time per element, we treat elements from different inputs to have similar processing times for simplicity. This allows the SDK to simplify by not having to keep track of time spent per input, but we may consider reporting it in future iterations if there is a strong use case for it.

Watermarks

The watermarks reported are tentative, to get a better sense of progress while processing a bundle but before it is committed. At bundle commit time, a Runner needs to also take into account the timers set to compute the actual watermarks.

Proto Definitions[s]

message ProcessBundleProgressRequest {

  // (Required) A reference to an active process bundle request with

  // the given instruction id.

  string instruction_reference = 1;

}

message Metrics {

  // PTransform level metrics.

  message PTransform {

    // Map from local input name to number of elements fully processed for this

    // input.

    map<string, int64> input_elements_consumed_counts = 1;

    // Map from local output name to number of elements produced for this

    // output.

    map<string, int64> output_elements_produced_counts = 2;

    // Number of inputs elements that are partially processed.

    int64 active_input_elements_count = 3;

    // Map from local output name to number of elements produced for this

    // output from the active set of input elements.

    map<string, int64> active_output_elements_produced_counts = 4;

    // (optional) Map from local output name to estimated number of elements

    // remaining for this output from the active set of input elements, at the

    // time this progress is reported.

    // Special Cases :-

    //   1. missing indicates the transform did not report it, so a Runner could

    //      compute an average estimate.

    //   2. negative count indicates known work is infinite.

    map<string, int64> active_output_elements_remaining_counts = 5;

    // (Optional) The total processing time spent (millis) in this transform so far.

    int64 total_time_spent = 6;

    // (Optional): Map from local output name to its watermark (millis).

    map<string, int64> watermarks = 7;

    // TODO: Define other transform level system metrics.

  }

  // User defined metrics

  message User {

    // TODO: Define it.

  }

  map<string, PTransform> ptransforms = 1;

  map<string, User> user_metrics = 2;

}

message ProcessBundleProgressResponse {

  Metrics metrics = 1;

}

message ProcessBundleResponse {

  Metrics metrics = 1;

}

Appendix

Alternative Considered

One other approach that we considered was to have the SDK compute the cumulative(inclusive of downstream nodes) backlog for each transform. This would simplify the Runner as it would just need to sum up all the backlogs to compute the bundle backlog, but this masks some information about each transform, that maybe useful to a Runner. The SDK would also need to handle cases like splits, joins etc. in the fused sub-graph to accurately compute the cumulative backlog, which is an additional burden on each SDK. Overall we think that it is better for an SDK to provide more granular information and let the Runner make the coarser computations, hence we did not go with this approach. Thanks to Robert Bradshaw and Eugene Kirpichov for their valuable insights.

Call Flow Diagram

@startuml

box "Runner" #LightBlue

  participant Control

end box

box "SDK harness" #LightGreen

  participant SDK

end box

[->Control

Activate Control

autonumber 1 1 "<b>0"

Control->SDK : Register Fns (if any are required)

Activate SDK

autonumber stop

Control<-SDK

autonumber resume

Deactivate SDK

Control->SDK : Request SDK to process a bundle.

 

Activate SDK

... ...

Control->SDK : Request SDK to report progress for an active bundle

Control<-SDK: Progress response

... ...

Control->SDK : Request SDK to report progress for an active bundle

Control<-SDK: Progress response

... ...

 

Control<-SDK : Process bundle response

Deactivate SDK

 

autonumber stop

[<-Control

Deactivate Control

@enduml

Latex Equations

activeElementsFractionConsumed(pt) =

   pt.activeElements * \hspace{6cm} \\ \frac{\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}outputTags}^{ } pt.activeOutputProduced(tag)}{{\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}outputTags}^{ } pt.activeOutputProduced(tag) + pt.activeOutputRemaining(tag)}}

avgElementProcessingTime(pt) = \hspace{11cm} \\

\frac{pt.totalTimeSpent}{activeElementsFractionConsumed(pt) +\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}inputTags}^{ }pt.inputElementsConsumed(tag) \hspace{1mm}}

avgOutputCountPerElement(pt, tag) = \hspace{11cm} \\

\frac{pt.outputProduced(tag) + pt.activeOutputProduced(tag) + pt.activeOutputRemaining(tag)}{pt.activeElements + \sum_{itag\hspace{1mm}\epsilon \hspace{1mm}inputTags}{ }pt.elementsConsumed(itag)}

intermediateUnprocessedElements(pt) = \hspace{12cm} \\ (\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}inputs}{ }parent(pt, tag).outputProduced(tag) - pt.elementsConsumed(tag)) -  pt.activeElements \hspace{1cm}

totalInputElementsRemaining(pt) = \hspace{11cm} \\ \sum_{tag\hspace{1mm}\epsilon\hspace{1mm}inputTags}^{ } (totalOutputElementsRemaining(parent(pt, tag), tag) +\\ intermediateUnprocessedElements(pt, tag))

totalOutputElementsRemaining(pt, tag) = \hspace{10cm} \\

totalInputElementsRemaining(pt) * avgOutputCountPerElement(pt, tag) + \\ pt.activeOutputRemaining(tag)

backlogSeconds = \hspace{14cm} \\

\sum_{pt\hspace{1mm}\epsilon \hspace{1mm}ptransforms}^{ } avgElementProcessingTime(pt) * (totalInputElementsRemaining(pt) + \\ pt.activeElements * activeElementsFractionsRemaining(pt))

[a]Hi team, is this document up to date? I see that it was written in 2017. Is there a more up to date version?

[b]Seems asymmetric that input elements consumed and output elements produces are partitioned by finished/active, but time isn't.

[c]Yes even I like to make it symmetric, but before going that way I want to understand how difficult it is for SDKs to measure time across processed and active groups, what the performance implications are. Once we have a end to end progress working with this proposal, it will be easier to experiment.

[d]You mean that this include the time to produce output but doesn't include 1.2 sec spent by DoFn, right ?

[e]Correct

[f]Superseded by repoorting backlog discussed in

https://s.apache.org/beam-bundles-backlog-splitting

[g]This is an improper fraction (due to pt.activeElements * ) ?

[h]Yes if there are more than one active elements.

[i]This assumes progress is measured by output elements (which may be zero) and furthermore that outputs are produce linearly (which will break down for expensive DoFns that have only one output).

[j]In other words, measuring fraction done as (seen output elements)/(total expected output elements) only works well if there's a large number of output elements that are evenly produced.

[k]Agreed. This goes back to not having time measured for processed and active groups separately. Once we have that, we can switch this based on time.This also brings up the point that SDF may need to report backlog in both #output and time remaining. Time used to compute fraction, #output used to compute downstream work.

[l]This is especially true since we are reporting "infinity" for amount of active output elements remaining in unbounded sources.

Would it help if we weren't reporting infinity for remaining time/# active output elements?

[m]All these estimate equations aren't supposed to work when SDF reports infinite backlog (like an unbounded counting source). In such cases the reported watermark will be the indicator of progress.

I didn't follow  "..is especially true since we are reporting infinity". What would you report instead for such sources?

[n]Unbounded sources should not report infinity, they report "known" backlog, i.e. work that's currently available. This allows the autoscaling system to make a judgement about how it could scale to keep this backlog to a reasonable level.

[o]I was treating known backlog could be infinite, for something like unbounded counting source. What do you propose it reports here? If it doesn't report any backlog, system will not scale. It could report a finite backlog to allow for scaling but the residual still being an infinite source?

[p]That's an interesting case. I really don't know what the scaling characteristics should be for a streaming system that's consuming an infinite counting source (that can be consumed as fast as possible). It's valid to throw infinite machines at it because there's an infinite amount of work to do *right now*.

The backlog idea generally makes a distinction between available work being a subset of future work which breaks down here.

[q]"is especially true" was referring to the fact that most SDFs will have a single input element representing an unbounded amount of output. If we say there is an infinite amount of output remaining then that is like saying we have one really large element to process and not giving any insight into it.

I imagine that the amount of output I can produce should be a number representing how much I could do know without needing to wait if I had infinite compute but zero time and for an unbounded counting source that would be infinity, but for many unbounded sources like Kafka/Pubsub it would be if I read all the messages without needing to wait how much could I produce. What do you think?

[r]Yes, I think this is the general idea of "available."

[s]This is out of date, the way in which we report metrics will be done via https://s.apache.org/beam-fn-api-metrics