Apache Beam Fn API[a]
Progress Reporting
https://s.apache.org/beam-fn-api-progress-reporting
Vikas Kedigehalli(vikasrk@google.com)
Lukasz Cwik (lcwik@google.com)
Overview
See the Apache Beam Fn API Overview for an introduction into the purpose, a high level overview and a listing of related documents. This document proposes a way for the SDK harness to report progress of a bundle to the Runner over the Fn API.
SDK Implementation Considerations
Buffering and Parallel Processing
Runner Implementation Considerations
At a high level, a Runner sends a progress request to the SDK periodically, to which the SDK sends a progress response, both request and response going over the control plane. Since the response is asynchronous, a Runner may chose to block or use previously computed results.
The SDK provides progress information for each PTransform within a currently active bundle. The per transform progress information will contain:
By reporting such granular progress, a Runner gets visibility into the properties of each transform, while being able to use this information to compute the bundle level progress.
A SDK has a few options on what level of progress it wants to report:
Although recommended, progress reporting is optional. The SDK should return a response indicating that the progress reporting is not supported:
The Runner needs to decide on how to handle these error responses appropriately.
The Runner will treat the output watermark as positive infinity for all transforms contained within this bundle.
If a SDK supports progress reporting, it is recommended to report the output watermark of the root SDFs; element counts of all PTransform outputs. This is mainly useful for providing visibility into the progress being made, while keeping the SDK implementation simple. However, it does not provide an accurate estimate of the backlog of work remaining, thus limiting the Runner’s ability to perform dynamic work rebalancing efficiently.
Consider a SDK executing a bundle, as in Figure 1. The gRPC read reports that it has output 4 elements, out of which 2 have been fully processed by the SDF. The other two are being actively processed by the SDF, with an estimated remaining work of 10 elements. Note that even though SDF has produced 50 elements, the following DoFn has only processed 40, and 1 still active. The other 9 haven’t been seen by the DoFn yet, this means that they are likely in an intermediate buffer depending on how the SDK executes the graph. All of this would be reported in response to a single progress request.
A SDK may report more advanced forms of progress, in which the total time spent in each transform is reported. This allows a Runner to estimate the amount of work remaining and perform dynamic work rebalancing efficiently.
This requires the SDK to keep track of element processing times for each transform, which can be expensive, so it is recommended to use sampling approaches to compute reasonable estimates, for performance reasons.
Consider the SDK executing the same bundle but now reporting total time spent for each transform, in Figure2. Total time spent reported for the SDF is around 19s, which is inclusive of the time spent in output produced so far[d][e] from active elements.
The reason for splitting the information into fully processed elements and active elements is that it gives the SDK the flexibility in how it executes a fused sub-graph. SDKs could choose to perform optimizations by buffering elements between transforms and/or executing these transforms in parallel, as long as the progress of the elements in flight is reported correctly under the active elements. For watermarks, it will be the minimum of the watermarks tracked for each element in flight, for each PTransform output.
Bundle backlog is defined as the amount of time estimated to process all the known unclaimed work for this bundle.
A Runner needs to implement the following functions to compute bundle backlog:
An estimate of the fraction of active input elements consumed[g][h]:
[i][j][k][l][m][n][o][p][q][r]
Average time taken to process an input element:
Average output count per input element:
The number of unprocessed elements that have been produced by the parent transform but not yet started processing:
The estimated total number of fully unprocessed input elements that need to be processed by a transform for all the remaining work for a bundle:
The estimated total number of output elements to be produced for all the remaining work in a bundle:
The backlog for a bundle in seconds:
The total time spent reported is inclusive of all inputs in a multiple input transform like Flatten or DoFn with Timers. When computing the average processing time per element, we treat elements from different inputs to have similar processing times for simplicity. This allows the SDK to simplify by not having to keep track of time spent per input, but we may consider reporting it in future iterations if there is a strong use case for it.
The watermarks reported are tentative, to get a better sense of progress while processing a bundle but before it is committed. At bundle commit time, a Runner needs to also take into account the timers set to compute the actual watermarks.
message ProcessBundleProgressRequest { // (Required) A reference to an active process bundle request with // the given instruction id. string instruction_reference = 1; } message Metrics { // PTransform level metrics. message PTransform { // Map from local input name to number of elements fully processed for this // input. map<string, int64> input_elements_consumed_counts = 1; // Map from local output name to number of elements produced for this // output. map<string, int64> output_elements_produced_counts = 2; // Number of inputs elements that are partially processed. int64 active_input_elements_count = 3; // Map from local output name to number of elements produced for this // output from the active set of input elements. map<string, int64> active_output_elements_produced_counts = 4; // (optional) Map from local output name to estimated number of elements // remaining for this output from the active set of input elements, at the // time this progress is reported. // Special Cases :- // 1. missing indicates the transform did not report it, so a Runner could // compute an average estimate. // 2. negative count indicates known work is infinite. map<string, int64> active_output_elements_remaining_counts = 5; // (Optional) The total processing time spent (millis) in this transform so far. int64 total_time_spent = 6; // (Optional): Map from local output name to its watermark (millis). map<string, int64> watermarks = 7; // TODO: Define other transform level system metrics. } // User defined metrics message User { // TODO: Define it. } map<string, PTransform> ptransforms = 1; map<string, User> user_metrics = 2; } message ProcessBundleProgressResponse { Metrics metrics = 1; } message ProcessBundleResponse { Metrics metrics = 1; } |
One other approach that we considered was to have the SDK compute the cumulative(inclusive of downstream nodes) backlog for each transform. This would simplify the Runner as it would just need to sum up all the backlogs to compute the bundle backlog, but this masks some information about each transform, that maybe useful to a Runner. The SDK would also need to handle cases like splits, joins etc. in the fused sub-graph to accurately compute the cumulative backlog, which is an additional burden on each SDK. Overall we think that it is better for an SDK to provide more granular information and let the Runner make the coarser computations, hence we did not go with this approach. Thanks to Robert Bradshaw and Eugene Kirpichov for their valuable insights.
@startuml box "Runner" #LightBlue participant Control end box box "SDK harness" #LightGreen participant SDK end box [->Control Activate Control autonumber 1 1 "<b>0" Control->SDK : Register Fns (if any are required) Activate SDK autonumber stop Control<-SDK autonumber resume Deactivate SDK Control->SDK : Request SDK to process a bundle.
Activate SDK ... ... Control->SDK : Request SDK to report progress for an active bundle Control<-SDK: Progress response ... ... Control->SDK : Request SDK to report progress for an active bundle Control<-SDK: Progress response ... ...
Control<-SDK : Process bundle response Deactivate SDK
autonumber stop [<-Control Deactivate Control @enduml |
activeElementsFractionConsumed(pt) =
pt.activeElements * \hspace{6cm} \\ \frac{\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}outputTags}^{ } pt.activeOutputProduced(tag)}{{\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}outputTags}^{ } pt.activeOutputProduced(tag) + pt.activeOutputRemaining(tag)}}
avgElementProcessingTime(pt) = \hspace{11cm} \\
\frac{pt.totalTimeSpent}{activeElementsFractionConsumed(pt) +\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}inputTags}^{ }pt.inputElementsConsumed(tag) \hspace{1mm}}
avgOutputCountPerElement(pt, tag) = \hspace{11cm} \\
\frac{pt.outputProduced(tag) + pt.activeOutputProduced(tag) + pt.activeOutputRemaining(tag)}{pt.activeElements + \sum_{itag\hspace{1mm}\epsilon \hspace{1mm}inputTags}{ }pt.elementsConsumed(itag)}
intermediateUnprocessedElements(pt) = \hspace{12cm} \\ (\sum_{tag\hspace{1mm}\epsilon \hspace{1mm}inputs}{ }parent(pt, tag).outputProduced(tag) - pt.elementsConsumed(tag)) - pt.activeElements \hspace{1cm}
totalInputElementsRemaining(pt) = \hspace{11cm} \\ \sum_{tag\hspace{1mm}\epsilon\hspace{1mm}inputTags}^{ } (totalOutputElementsRemaining(parent(pt, tag), tag) +\\ intermediateUnprocessedElements(pt, tag))
totalOutputElementsRemaining(pt, tag) = \hspace{10cm} \\
totalInputElementsRemaining(pt) * avgOutputCountPerElement(pt, tag) + \\ pt.activeOutputRemaining(tag)
backlogSeconds = \hspace{14cm} \\
\sum_{pt\hspace{1mm}\epsilon \hspace{1mm}ptransforms}^{ } avgElementProcessingTime(pt) * (totalInputElementsRemaining(pt) + \\ pt.activeElements * activeElementsFractionsRemaining(pt))
[a]Hi team, is this document up to date? I see that it was written in 2017. Is there a more up to date version?
[b]Seems asymmetric that input elements consumed and output elements produces are partitioned by finished/active, but time isn't.
[c]Yes even I like to make it symmetric, but before going that way I want to understand how difficult it is for SDKs to measure time across processed and active groups, what the performance implications are. Once we have a end to end progress working with this proposal, it will be easier to experiment.
[d]You mean that this include the time to produce output but doesn't include 1.2 sec spent by DoFn, right ?
[e]Correct
[f]Superseded by repoorting backlog discussed in
https://s.apache.org/beam-bundles-backlog-splitting
[g]This is an improper fraction (due to pt.activeElements * ) ?
[h]Yes if there are more than one active elements.
[i]This assumes progress is measured by output elements (which may be zero) and furthermore that outputs are produce linearly (which will break down for expensive DoFns that have only one output).
[j]In other words, measuring fraction done as (seen output elements)/(total expected output elements) only works well if there's a large number of output elements that are evenly produced.
[k]Agreed. This goes back to not having time measured for processed and active groups separately. Once we have that, we can switch this based on time.This also brings up the point that SDF may need to report backlog in both #output and time remaining. Time used to compute fraction, #output used to compute downstream work.
[l]This is especially true since we are reporting "infinity" for amount of active output elements remaining in unbounded sources.
Would it help if we weren't reporting infinity for remaining time/# active output elements?
[m]All these estimate equations aren't supposed to work when SDF reports infinite backlog (like an unbounded counting source). In such cases the reported watermark will be the indicator of progress.
I didn't follow "..is especially true since we are reporting infinity". What would you report instead for such sources?
[n]Unbounded sources should not report infinity, they report "known" backlog, i.e. work that's currently available. This allows the autoscaling system to make a judgement about how it could scale to keep this backlog to a reasonable level.
[o]I was treating known backlog could be infinite, for something like unbounded counting source. What do you propose it reports here? If it doesn't report any backlog, system will not scale. It could report a finite backlog to allow for scaling but the residual still being an infinite source?
[p]That's an interesting case. I really don't know what the scaling characteristics should be for a streaming system that's consuming an infinite counting source (that can be consumed as fast as possible). It's valid to throw infinite machines at it because there's an infinite amount of work to do *right now*.
The backlog idea generally makes a distinction between available work being a subset of future work which breaks down here.
[q]"is especially true" was referring to the fact that most SDFs will have a single input element representing an unbounded amount of output. If we say there is an infinite amount of output remaining then that is like saying we have one really large element to process and not giving any insight into it.
I imagine that the amount of output I can produce should be a number representing how much I could do know without needing to wait if I had infinite compute but zero time and for an unbounded counting source that would be infinity, but for many unbounded sources like Kafka/Pubsub it would be if I read all the messages without needing to wait how much could I produce. What do you think?
[r]Yes, I think this is the general idea of "available."
[s]This is out of date, the way in which we report metrics will be done via https://s.apache.org/beam-fn-api-metrics