The Beam Model : Streams & Tables
takidau@apache.org
http://s.apache.org/beam-streams-tables
Stream and table basics or: a special theory of stream & table relativity 1
Toward a general theory of stream & table relativity 2
Batch processing vs Streams & Tables 3
A streams & tables analysis of MapReduce 3
What, where, when, & how in a streams/tables world 9
A holistic view of streams & tables in the Beam model 23
A general theory of stream & table relativity 28
The point of this document is to try to describe the relationship between the Beam Model (as described in The Dataflow Model paper and the Streaming 101 and Streaming 102 blog posts) and the theory of “streams & tables” (as popularized by Martin Kleppmann and Jay Kreps, amongst others, but essentially originating out of the database world). It turns out that stream & table theory does an illuminating job of describing the low-level concepts that underlie the Beam Model. Additionally, a clear understanding of how they relate is particularly informative when considering how robust stream processing concepts might be cleanly integrated into SQL.
Note that, for the sake of completeness, the first section of this document will be retreading ground well covered elsewhere (primarily Martin and Jay’s posts above), though in a more condensed form.
The basic idea of streams and tables derives from the database world. Anyone familiar with SQL is likely familiar with tables and their basic properties, roughly summarized as: tables contain rows and columns of data, and each row is uniquely identified by some sort of key, either explicit or implicit.
If you think back to your database systems class in college, you’ll probably recall the data structure underlying most databases is an append-only log. As transactions are applied to a table in the database, those transactions are recorded in a log[a], the contents of which are then serially applied to the table to materialize those updates. In streams & tables nomenclature, that log is effectively the stream.
From that perspective, we now understand how to create a table from a stream: the table is just the result of applying the transaction log of updates found in the stream. But how do we create a stream from a table? It’s essentially the inverse: a stream is a changelog for a table. The motivating example typically used for table → stream conversion is materialized views. Materialized views in SQL let you specify a query on a table which itself is then manifested by the database system as another first-class table. This materialized view is essentially a cached version of that query which the database system ensures is always up to date as the contents of the source table evolve over time. Perhaps unsurprisingly, materialized views are implemented via the changelog for the original table: any time the source table changes, that change is logged. The database then evaluates that change within the context of the materialized view’s query and applies any resulting change to the destination materialized view table.
Combining these two points together and employing a questionable physics analogy, we arrive at what one might call the Special Theory of Stream & Table Relativity[b][c][d]:
This is a very powerful pair of concepts, and their careful application to the world of stream processing is a big reason for the massive success of Apache Kafka, the ecosystem for which is built around these underlying principles. However, those statements themselves are not quite general enough to allow us to tie streams and tables to all of the concepts in the Beam Model. For that, we must go a little bit deeper.
If we want to reconcile stream/table theory with everything we know of the Beam Model, we’ll need to tie up some loose ends, specifically:
As we attempt to do so, it will be helpful to have the right mindset about streams and tables. In addition to understanding them in relation to one another, as captured by the definition above, it can be illuminating to define them independently from one another. A simple way of looking at it that will underscore some of our future analyses is:
Though tables and streams are intimately related, it’s important to keep in mind that they are very much not the same thing, even if there are many cases where one may be fully derived from the other. The differences are subtle, but important, as we’ll see below.
With our proverbial knuckles now cracked, let’s start to tie up some loose ends. To begin with, we’ll tackle the first one, regarding batch processing. At the end, we’ll discover that the resolution to the second issue, regarding the relationship of streams to bounded and unbounded data, will fall out naturally from the answer for the first. Score one for serendipity.
To keep our analysis relatively simple, but solidly concrete, as it were, let’s look at how a traditional MapReduce job fits into the streams/tables world. As alluded to by its name, a MapReduce job superficially consists of two phases: Map and Reduce. For our purposes, though, it’s useful to look a little deeper and treat it more like six:
Note that the MapWrite and ReduceRead phases sometimes get referred to in aggregate as the Shuffle phase, but for our purposes, it’s better to consider them independently. It’s perhaps also worth noting that the functions served by the MapRead and ReduceWrite phases are more commonly referred to these days as Sources and Sinks. Digressions aside however, let’s now see how this all relates to streams & tables.
Since we start and end with static[e][f][3] datasets, it should be clear we begin with a table and end with a table. But what do we have in between? Naively, one might assume it’s tables all the way down; after all, batch processing is (conceptually) known to consume and produce tables. And if you think of a batch processing job as a rough analog of executing a classic SQL query, that feels relatively natural. But let’s look a little more closely at what’s really happening, step by step.
First up, MapRead consumes a table and produces something. That something is consumed next by the Map phase, so if we want to understand its nature, a good place to start would be with the Map phase API, which looks something like this in Java:
void map(KI key, VI value, Emit<KO, VO>);
The map call will be repeatedly invoked for each key/value pair in the input table. If you think this sounds suspiciously like the input table is being consumed as a stream of records, you’d be right. We’ll look more closely at how the table is being converted into a stream later, but for now, suffice it to say that the MapRead phase is iterating over the data at rest in the input table and putting them into motion in the form of a stream that is then consumed by the Map phase.
Next up, the Map phase consumes that stream, and then does what? Since the map operation is an element-wise transformation, it’s not doing anything that will halt the moving elements and put them to rest. It may change the effective cardinality of the stream, by either filtering some elements out or exploding some elements into multiple elements, but those elements all remain independent from one another after the the Map phase concludes. So it seems safe to say that the Map phase both consumes a stream as well as produces a stream.
Once the Map phase is done, we enter the MapWrite. As I noted above, the MapWrite groups records by key and then writes them in that format to persistent storage. The persistent part of the write actually isn’t strictly necessary at this point as long as there’s persistence somewhere (i.e., if the upstream inputs are persisted and one can recompute the intermediate results from them in cases of failure, similar to the approach Spark takes with RDDs). What is important is that the records are grouped together into some kind of datastore, be it in memory, on disk, or what have you. This is important because, as a result of this grouping operation, records which were previously flying past one-by-one in the stream are now brought to rest in a location dictated by their key, thus allowing per-key groups to accumulate as their like-keyed brethren and sistren arrive. Note how similar this is to the definition of stream → table conversion provided above: the aggregation of a stream of updates over time yields a table. The MapWrite, by virtue of grouping the stream of records by their keys, has put those data to rest and thus converted the stream back into a table[4]. Cool!
We’re now halfway through the MapReduce, so let’s recap what we’ve seen so far (embodied here in Figure 1):
Figure 1: Map phases in a MapReduce. Data in a table are converted to a stream and back again.
We’ve gone from table to stream and back again across three operations. MapRead converted the table into a stream, which was then transformed into a new stream by Map (via the user’s code), and which was then converted back into a table by MapWrite. We’re going to find that the next three operations in the MapReduce look very similar, so I’ll go through them more quickly, but I still want to point out one important detail along the way.
Picking up where we left off after the MapWrite, ReduceRead itself is relatively uninteresting. It’s basically identical to MapRead, except that the values being read are singleton lists of values instead of singleton values, because the data stored by MapWrite were key/value-list pairs. But it’s still just iterating over a snapshot of a table to convert it into a stream. Nothing new here.
And even though it sounds like it might be interesting, Reduce in this context is really just a glorified Map phase that happens to receive a list of values for each key instead of a single value. So it’s still just mapping single (composite) records into zero or more new records. Nothing particularly new here, either.
ReduceWrite is the one that’s a bit noteworthy. We know already that this phase must convert a stream to a table, since Reduce produces a stream and the final output is a table. But how does that happen? If I told you it was a direct result of key-grouping the outputs from the previous phase into persistent storage, just like we saw with MapWrite, you might believe me, until you remembered that I noted above that key-association was an optional feature of the Reduce phase. With that feature enabled, ReduceWrite is essentially identical to MapWrite[5]. But if that feature is disabled, and the outputs from Reduce have no associated keys, what exactly is happening to bring those data to rest?
To understand what’s going on, it’s useful to think again of the semantics of a SQL table. Though often recommended, it’s not strictly required for a SQL table to have a primary key uniquely identifying each row. In the case of keyless tables, each row which is inserted is considered to be a new, independent row (even if the data therein are identical to one or more extant rows in the table), much as though there were an implicit AUTO_INCREMENT field being used as the key (which incidentally, is what’s effectively happening under the covers in most implementations, even though the “key” in this case may just be some physical block location which is never exposed or expected to be used as a logical identifier). This implicit unique key assignment is precisely what’s happening in ReduceWrite with unkeyed data. Conceptually, there’s still a group-by-key operation happening; that’s what brings the data to rest. But lacking a user-supplied key, the ReduceWrite is treating each record as though it has a new, never-before-seen key, and effectively grouping each record with itself, resulting again in data at rest[6].
Looking
now at the whole pipeline from the perspective of stream/tables, we can see that it’s a sequence of TABLE -> STREAM -> STREAM -> TABLE -> STREAM -> STREAM -> TABLE. Even though we’re processing bounded data, and even though we’re doing what we traditionally think of as batch processing, it’s really just streams and tables under the covers.
Figure 2: Map and Reduce phases in a MapReduce, viewed from the perspective of streams & tables.
So where does this leave us with respect to our first two questions?
Taken from this perspective, it’s easy to see that stream/table theory isn’t remotely at odds with batch processing of bounded data. In fact, it only further supports the idea I’ve been harping on that batch and streaming really aren’t that different: at the end of the of day, it’s streams and tables all the way down.
With that, we’re well on our way towards a general theory of streams and tables. But to wrap things up cleanly, we last need to revisit the four what/where/when/how questions within the streams/tables context, to see how they all relate.
In this section, we’ll look at each of the four questions and see how they relate to streams and tables. We’ll also answer any questions that may be lingering from the previous section, one big one being: if grouping is the thing that brings data to rest, what precisely is the “ungrouping” inverse that puts them in motion? More on that below. But for now, on to transformations.
In Streaming 102, we learned that transformations tell us what the pipeline is computing, i.e. whether it’s building models, counting sums, filtering spam, etc. We saw in the MapReduce example above that four of the six stages answered what questions:
Viewed in that light, you can see that there are essentially two types of what transforms from the perspective of stream/table theory:
To get a better sense for how all of this ties together, let’s look at an updated version of Figure 2 from Streaming 102, where we first started to look at transformations. To save you jumping back there to see what we were talking about, here’s the code snippet we were using:
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey()); |
Listing 1: Summation pipeline. Key/value data are read from an I/O source, with a String (e.g., team name) as the key and an Integer (e.g., individual team member scores) as the value. The values for each key are then summed together to generate per-key sums (e.g., total team score) in the output collection.
This pipeline is simply reading in input data, parsing out individual team member scores, and then summing those scores per team. The event time/processing time visualization of it looked like this:
Figure 3: Event time/processing time view of classic batch processing.
Let’s now look at a more topological view of this pipeline over time, rendered from a streams and tables perspective:
Figure 4: Streams & tables view of classic batch processing.
In the streams & tables version of this visualization, the passage of time is manifested by scrolling the graph area downwards in the processing-time dimension (Y-axis) as time advances. The nice thing about rendering things this way is that it very clearly calls out the difference between non-grouping and grouping operations. Unlike our previous diagrams, where I elided all initial transformations in the pipeline other than the Sum.integersByKey, I’ve included the initial parsing operation here as well, because the non-grouping aspect of the parsing operation provides a nice contrast to the grouping aspect of the summation. Viewed in this light, it’s very easy to see the difference between the two. The non-grouping operation does nothing to halt the motion of the elements in the stream, and as a result yields another stream on the other side. In contrast, the grouping operation brings all the elements in the stream to rest as it adds them together into the final sum. Since this example was running on a batch processing engine over bounded data, the final results are emitted only once end of the input is reached. As we noted in Streaming 102, this example is sufficient for bounded data, but is too limiting in the context of unbounded data, since the input will theoretically never end. But is it really insufficient?
Looking at the new streams/tables portion of the diagram, if all we’re doing is calculating sums as our final results (and not actually transforming those sums in any additional way further downstream within the pipeline), then the table we created with our grouping operation has our answer sitting right there, evolving over time as new data arrive. Why don’t we just read our results from there?
This is exactly the point being made by the folks championing stream processors as a database[7] (primarily the Kafka and Flink crews): anywhere you have a grouping operation in your pipeline, you’re creating a table that includes what is effectively the output values of that portion of the stage. If those output values happen to be the final thing your pipeline is calculating, you don’t need to re-materialize them somewhere else if you can read them directly out of that table. Besides providing quick and easy access to results as they evolve over time, this approach saves on compute resources by not requiring an additional sink stage in the pipeline to materialize the outputs, yields disk savings by eliminating redundant data storage, and obviates the need for any engineering work building the aforementioned sink stages[8]. The only major caveat is that care needs to be taken to ensure that only the data processing pipeline has the ability to make modifications to the table. If the values in the table can change out from under the pipeline due to external modification, all bets are off regarding consistency guarantees.
A number of folks in the industry have been recommending this approach for a while now, and it’s being put to great use in a variety of scenarios. We’ve seen MillWheel customers within Google do the same thing by serving data directly out of their Bigtable-based state tables, and we’re in the process of adding first-class support for accessing state from outside of your pipeline in the C++-based Apache Beam equivalent we use internally at Google (Google Flume); hopefully those concepts will make their way to Apache Beam proper someday soon, as well.
Now, reading from the state tables is great if the values therein are your final results. But if you have more processing to perform downstream in the pipeline (e.g., imagine our pipeline was actually computing the top scoring team), we still need some better way to cope with unbounded data, allowing us to transform the table back into a stream in a more incremental fashion. For that, we’ll want to journey back through the remaining three questions, beginning with windowing, expanding into triggering, and finally tying it all together with accumulation.
As we know from Streaming 102, windowing tells us where in event time grouping occurs. Combined with our experiences above, we can thus also infer it must play a role in stream → table conversion, since grouping is what drives table creation. There are really two aspects of windowing that interact with stream/table theory:
The effect of window assignment is quite straightforward. When a record is conceptually placed into a window, the definition of the window is essentially combined with the user-assigned key for that record to create an implicit composite key used at grouping time[9]. Simple.
For completeness, let’s take another look at the original windowing example from Streaming 102, but from a streams/tables perspective. If you recall, the code snippet looked something like this (with parsing not elided this time):
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))) |
Listing 2: Windowed summation code.
And the original visualization looked like this:
Figure 5: Event time/processing-time view of windowed summation on a batch engine.
And now the streams & tables version:
Figure 6: Streams & tables view of windowed summation on a batch engine.
As you might expect, this it looks remarkably similar to Figure 4, but with four groupings in the table (corresponding to the four windows occupied by the data) instead of just one. But as before, we have to wait until the end of our bounded input is reached before emitting results. We’ll look at how to address this for unbounded data in the the next section, but first let’s touch briefly on merging windows.
Moving on to merging, we’ll find that the effect of window merging is more complicated than window assignment, but still straightforward once you think about the logical operations that would need to happen. When grouping a stream into windows that can merge, that grouping operation has to take into account all of the windows that could possibly merge together. Typically, this is limited to windows whose data all have the same key (since we’ve already established that windowing modifies grouping to not be just by key, but also key and window). For this reason, the system doesn’t really treat the key/window pair as a flat composite key, but rather as a hierarchical key, with the user-assigned key as the root, and the window a child component of that root. When it comes time to actually group data together, the system first groups by the root of the hierarchy (the key assigned by the user). After the data have been grouped by key, the system can then proceed with grouping by window within that key (using the child components of the hierarchical composite keys). This act of grouping by window is where window merging happens.
What’s interesting from a streams/tables perspective is how this window merging changes the mutations which are ultimately applied to a table, i.e., how it modifies the changelog that dictates the contents of the table over time. With non-merging windows, each new element being grouped results in a single mutation to the table (to add that element to the group for the element’s key+window). With merging windows, the act of grouping a new element may result in one or more existing windows being merged with the new window. So the merging operation has to inspect all of the existing windows for the current key, figure out which windows can merge with this new window, and then atomically commit deletes for the old, pre-merge windows in conjunction with an insert for the new merged window into the table[10]. This is why systems which support merging windows typically define the unit of atomicity/parallelization as key, rather than key+window. Otherwise, it would be impossible (or at least much more expensive) to provide the strong consistency needed for correctness guarantees. When you start to look at it in this level of detail, you can see why it’s so nice to have the system taking care of the nasty business of dealing with window merges. For an even closer view of window merging semantics, I refer you to section 2.2.2 of the Dataflow Model paper.
At the end of the day, windowing is really just a minor alteration to the semantics of grouping, which means it’s a minor alteration to the semantics of stream to table conversion. For window assignment, it’s as simple as incorporating the window into an implicit composite key used at grouping time. When window merging gets involved, that composite key is treated more like a hierarchical key, allowing the system to handle the nasty business of grouping by key, figuring out window merges within that key, and then atomically applying all the necessary mutations to the corresponding table for us. Hooray for layers of abstraction!
All that said, we still haven’t actually addressed the problem of converting a table to a stream in a more incremental fashion in the case of unbounded data. For that, we need to revisit triggers.
We learned in Streaming 102 that triggers are used to dictate when the contents of a window will be materialized (with watermarks providing a useful signal of input completeness for certain types of triggers). Once data had been grouped together into a window, we used triggers to dictate when those data should be sent downstream. In streams/tables terminology, we understand that grouping means stream → table conversion. From there, it’s a relatively small leap to see that triggers are the complement to grouping, i.e., that “ungrouping” operation we were grasping for above. Triggers are what drive table → stream conversion.
In streams/tables terminology, triggers are special procedures applied to a table which allow for data within that table to be materialized in response to relevant events. Stated that way, they actually sound suspiciously similar to classic database triggers. And indeed, the choice of name here was no coincidence; they are essentially the same thing. When you specify a trigger, you are in effect writing code which then gets evaluated for every row in the state table as time progresses. When that trigger fires, it takes the corresponding data that are currently at rest in the table and puts them into motion, yielding a new stream.
Let’s return to our examples. We’ll start off with the basic streaming example from Streaming 102, which simply emitted results when complete (due to the watermark passing the end of the window). The code and event time/processing time visualization for that example were these (note that I’m only showing the heuristic watermark version here, for brevity and ease of comparison):
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark())) |
Listing 3: Explicit default trigger.
Figure 7: Event-time/processing time view of windowed summation on a streaming engine with heuristic watermarks.
Thanks to the trigger specified in Listing 3, which declares windows should be materialized when the watermark passes them, the system is able to emit results in a progressive fashion as the otherwise unbounded input to the pipeline becomes more and more complete. Looking at the streams and tables version, it looks as you might expect:
Figure 8: Streams & tables view of windowed summation on a streaming engine with heuristic watermarks.
In this version, you can see very clearly the ungrouping effect triggers have on the state table. As the watermark passes the end of each window, it pulls the result for that window out of the table and sets it in motion downstream, separate from all the other values in the table. We of course still have the late data issue from before, which we can solve again with the more comprehensive trigger from Listing 5:
PCollection<String> raw = IO.read(...); .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() |
Listing 4: Early and late firings via the early/late API.
The event time/processing time diagram looked like this:
Figure 9: Event time/processing time view of windowed summation on a streaming engine with early and late firings.
While the streams/tables version looks like this:
Figure 10: Streams & tables view of windowed summation on a streaming engine with early and late firings.
This version makes even more clear the ungrouping effect triggers have, rendering an evolving view of the various independent pieces of the table into a stream, as dictated by the triggers specified in Listing 5.
The semantics of all the concrete triggers we’ve talked about so far (event-time, processing time, count, composites like conjunctions and disjunctions, etc.) are just as you would expect when viewed from the streams/tables perspective, so aren’t worth further discussion. However, we haven’t yet spent much time talking about what triggers look like in a classic batch processing scenario. Now that we understand what the underlying streams/tables topology of a batch pipeline looks like, this is worth touching upon briefly.
At the end of the day, there’s really only one type of trigger used in classic batch scenarios: one which fires when the input is complete. For the initial MapRead stage of the MapReduce job we looked at above, that trigger would conceptually fire for all the data in the input table as soon as the pipeline launched, since the input for a batch job is assumed to be complete from the get go[11]. That input source table would thus be converted into a stream of individual elements, after which the Map stage could begin processing them.
For table to stream conversions in the middle of the pipeline, such as the ReduceRead stage in our example, the same type of trigger is used. In this case, however, the trigger must actually wait for all of the data in the table to be complete (i.e., what is more commonly referred to as all of the data being written to the shuffle), much as our example batch pipelines in Figures 4 and 6 waited for the end of the input before emitting their final results.
Given that classic batch processing effectively always makes use of the input-data-complete trigger, one might ask what any custom triggers specified by the author of the pipeline might mean in a batch scenario. The answer here really is: it depends. There are two aspects worth discussing:
At this point, we can stick a fork in the trigger section. It’s done. We have only one more brief stop on our way to having a holistic view of the relationship between the Beam Model and streams & tables theory: accumulation.
In Streaming 102, we learned that the three output modes (discarding, accumulating, accumulating & retracting[13]) tell us how refinements of results relate when a window is triggered multiple times over the course of its life. Fortunately, the relationship to streams/tables here is pretty straightforward:
The streams & tables visualizations of accumulation modes add little additional insight into their semantics, so we won’t investigate them here.
Having addressed the four questions, we can now take a holistic view of streams and tables in a Beam model pipeline. Let’s take our running example (the team scores calculation pipeline) and see what its structure looks like at the streams at table level. The full code for the pipeline might look something like this (expanding upon Listing 4):
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() .apply(Write.to(...)); |
Listing 5: Early and late firings via the early/late API.
Breaking that apart into stages separated by the intermediate PCollection types (where I’ve used more semantic “type” names like Team and User Score than real types for clarity of what is happening at each stage), you would arrive at something like Figure 11 below:
Figure 11: Logical phases of a team score summation pipeline, with intermediate PCollection types.
When you actually run this pipeline, it first goes through an optimizer, whose job is to convert this logical execution plan into an optimized, physical execution plan. Each execution engine is different, so actual physical execution plans will vary between runners. But a believable strawperson plan might look something like Figure 12:
Figure 12: Theoretical physical phases of a team score summation pipeline, with intermediate PCollection types.
There’s a lot going on here, so let’s walk through all of it. There are three main differences between Figure 11 and Figure 12 that we’ll be discussing:
Let’s now walk through each logical operation in detail and see what it translated to in the physical plan, and also see how they all relate to streams & tables:
The big takeaway here is not so much the precise details of everything that’s going on in the physical plan, but more the overall relationship of the Beam model to the world of streams and tables. We saw three types of operations: non-grouping (e.g., Parse), grouping (e.g., GroupMergeAndCombine), and ungrouping (e.g., Trigger). The non-grouping operations always consumed streams and produced streams on the other side. The grouping operations always consumed streams and yielded tables. And the ungrouping operations consumed tables and yielded streams. These insights, along with everything else we’ve learned along the way, are enough for us to formulate a more general theory about the relationship of the Beam Model to streams & tables.
Having surveyed how stream processing, batch processing, the four what/where/when/how questions, and the Beam Model as a whole relate to stream and table theory, let’s now attempt to articulate a more general definition of stream & table relativity.
A general theory of stream & table relativity
With this set of rules, we not only better understand the underlying nature of data processing systems, both batch and streaming, we’ve also learned some handy things along the way:
But I think the most important point of all is this: when you look at things from the streams and tables point of view, it becomes much clearer how batch and streaming really are the same thing conceptually. Bounded or unbounded, it doesn’t matter. It’s streams and tables from top to bottom.
[1] And note that in some cases, the tables themselves may accept time as a query parameter, allowing you to peer backwards in time to snapshots of the table as it existed in the past.
[2] Note that no guarantees are made about the keys of two successive records observed by a single mapper, since no key-grouping has occurred yet. The existence of the key here is really just to allow keyed datasets to be consumed in a natural way, and if there are no obvious keys for the input data, they’ll all just share what is effectively a global null key.
[3] Calling the inputs to a batch job “static” may be a bit strong. In reality, the dataset being consumed may be constantly changing as it’s processed, i.e. if you’re reading directly from a Bigtable/HBase table within a timestamp range where the data aren’t guaranteed to be immutable. But in most cases, the recommended approach is to ensure that you’re somehow processing a static snapshot of the input data, and any deviation from that assumption is at your own peril.
[4] Note that grouping a stream by key is importantly distinct from simply partitioning that stream by key, which ensures all records with the same key end up being processed by the same machine, but doesn’t do anything to put the records to rest. They instead remain in motion, and thus continue on as a stream. A grouping operation is more like a partition-by-key followed by a write to the appropriate group for that partition, which is what puts them to rest and turns the stream into a table.
[5] One giant difference, from an implementation perspective at least, being that ReduceWrite, knowing that keys have already been grouped together by MapWrite, and further knowing that Reduce is unable to alter keys in the case where its outputs remain keyed, can simply accumulate the outputs generated by reducing the values for a single key in order to group them together, which is much simpler than the full-blown shuffle implementation required for a MapWrite phase.
[6] Another way of looking at it is that there are two types of tables: updateable and appendable; this is the way the Flink folks have framed it for their Table API. But while that’s a great intuitive way of capturing the observed semantics of the two situations, I think it obscures the underlying nature of what’s actually happening that causes a stream to come to rest as a table, i.e., grouping.
[7] Though as we can clearly see from this example, it’s not just a streaming thing; you can get the same effect with a batch system if its state tables are world readable.
[8] This is particularly painful if a sink for your storage system of choice doesn’t exist yet; building proper sinks that can uphold consistency guarantees is a surprisingly subtle and difficult task.
[9] This also means that if you place a value into multiple windows, e.g., sliding windows, the value must conceptually be duplicated into multiple, independent records, one per window. Even so, it’s possible in some cases for the underlying system to be smart about how it treats certain types of overlapping windows, thus optimize away the need for actually duplicating the value. Spark, for example, does this for sliding windows.
[10] This is why systems which support merging windows typically define the unit of atomicity/parallelization as key, rather than key+window. Otherwise, it would be impossible (or at least much more expensive) to provide the strong consistency needed for correctness guarantees.
[11] Note that this high-level conceptual view of how things work in batch pipelines belies the complexity of efficiently triggering an entire table of data at once, particularly when that table is sizeable enough to require a plurality of machines to process. The SplittableDoFn API recently added to Beam provides some insight into the mechanics involved.
[12] And yes, if you blend batch and streaming together you get Beam, which is where that name came from originally. For reals.
[13] This is why you should always use an Oxford comma.
[14] Note that in the case of merging windows, in addition to merging the current values for the two windows to yield a merged current value, the previous values for those two window would need to be merged as well to allow for the later calculation of a merged delta come triggering time.
[a]May be using Write Ahead Log (WAL) would make it easy for people for correlate
[b]Are there any references-papers besides blog posts?
[c]_Marked as resolved_
[d]_Re-opened_
[e]perhaps "bounded" is a better word, what do you think?
[f]I used static because I felt like it implied data at rest in a table a little better than bounded, since it's also perfectly reasonable to have a bounded stream of data (but a static stream of data feels a little weird).