Open Problems
in Stream Processing
A Call to Action
Tyler Akidau
Software Engineer, Google (then) → Snowflake (now)
@takidau
DEBS 2019 Keynote�
http://s.apache.org/open-problems-stream-processing
Who am I? Why do I care about streaming?
Ten years of industry experience with massive-scale stream processing:
Various streaming publications:
What is the purpose of this talk?
What is the purpose of this talk?
Share my opinions on where we are and where we need to go
Encourage future investment in streaming research
Learn some new things from all of you�
1�2�3�4�5�6�7
Graceful evolution�Operational ease of use�SQL�Formal semantics�Latency ↔ Cost ↔ Correctness�Batch + Streaming Interoperability�Database-style optimizations
Agenda
Why does it matter?
Where are we?
Things to explore
Per Topic
1
Graceful evolution
Because change is the only constant
Graceful evolution: why does it matter?
Graceful evolution: where are we?
Graceful evolution: things to explore
But graceful evolution is just one piece of the larger story of...
2
Operational ease of use
Because building pipelines is the easy part
Operational ease of use: why does it matter?
Operational ease of use: where are we?
Operational ease of use: things to explore
Ops are super important, but ease of use goes well beyond operations...
3
SQL
Because SQL makes data processing accessible
SQL: why does it matter?
SQL: where are we?
Deep dive on streaming SQL
Streams & tables (KSQL)
https://dl.acm.org/ft_gateway.cfm?id=3242155
One SQL to Rule Them All
https://arxiv.org/pdf/1905.12133.pdf
Time-Varying Relations
Regular and Streaming SQL
Time-Varying Relations (TVRs)
Key Insight
Streams and Tables are different representations �of the same semantic object - a TVR.
“Streams are Tables” instead of “Streams and Tables”
TVR Representations
TVR of auction bids
--------------------------�| bidtime | price | item |�--------------------------
TVR Representations
TVR of auction bids
--------------------------�| bidtime | price | item |�--------------------------
Stream representation of TVR
8:08 INSERT (8:07, $2, A)�8:12 INSERT (8:11, $3, B)�8:13 INSERT (8:05, $4, C)�8:15 INSERT (8:09, $5, D)�8:17 INSERT (8:13, $1, E)�8:18 INSERT (8:17, $6, F)
Time of change
Type of change
Changed data
TVR Representations
TVR of auction bids
--------------------------�| bidtime | price | item |�--------------------------
Stream representation of TVR
8:08 INSERT (8:07, $2, A)�8:12 INSERT (8:11, $3, B)�8:13 INSERT (8:05, $4, C)�8:15 INSERT (8:09, $5, D)�8:17 INSERT (8:13, $1, E)�8:18 INSERT (8:17, $6, F)
Table representation of TVR
8:14> SELECT * FROM bids;�--------------------------�| bidtime | price | item |�--------------------------�| 8:07 | $2 | A |�| 8:11 | $3 | B |�| 8:05 | $4 | C |�--------------------------
Time of query
TVR Representations
TVR of auction bids
--------------------------�| bidtime | price | item |�--------------------------
Stream representation of TVR
8:08 INSERT (8:07, $2, A)�8:12 INSERT (8:11, $3, B)�8:13 INSERT (8:05, $4, C)�8:15 INSERT (8:09, $5, D)�8:17 INSERT (8:13, $1, E)�8:18 INSERT (8:17, $6, F)
Table representation of TVR
8:14> SELECT * FROM bids;�--------------------------�| bidtime | price | item |�--------------------------�| 8:07 | $2 | A |�| 8:11 | $3 | B |�| 8:05 | $4 | C |�--------------------------
8:20> SELECT * FROM bids;
--------------------------
| bidtime | price | item |
--------------------------
| 8:07 | $2 | A |
| 8:11 | $3 | B |
| 8:05 | $4 | C |
| 8:09 | $5 | D |
| 8:13 | $1 | E |
| 8:17 | $6 | F |
--------------------------
No SQL Extension
Event Time Semantics
Event Time vs. Processing Time
Implementing Event Time Semantics
*Millwheel (VLDB’13) proposed watermarks. Later adopted by Cloud Dataflow, Beam, and Flink
SQL Extension 1: Event Time Attributes
SQL Extension 1: Event Time Attributes
8:07 WM → 8:05
8:08 INSERT (8:07, $2, A)
8:12 INSERT (8:11, $3, B)
8:13 INSERT (8:05, $4, C)
8:14 WM → 8:08
8:15 INSERT (8:09, $5, D)
8:16 WM → 8:12
8:17 INSERT (8:13, $1, E)
8:18 INSERT (8:17, $6, F)
8:21 WM → 8:20
Watermark
Event Timestamp
SQL Extension 2: Event Time Windowing Functions
------------------------------------------
| wstart | wend | bidtime | price | item |
------------------------------------------
| 8:00 | 8:10 | 8:07 | $2 | A |
| 8:10 | 8:20 | 8:11 | $3 | B |
| 8:00 | 8:10 | 8:05 | $4 | C |
| 8:00 | 8:10 | 8:09 | $5 | D |
| 8:10 | 8:20 | 8:13 | $1 | E |
| 8:10 | 8:20 | 8:17 | $6 | F |
------------------------------------------
8:21> SELECT *
FROM
Tumble (
data => TABLE(Bid),
timecol => DESCRIPTOR(bidtime),
dur => INTERVAL '10 ' MINUTES);
SQL Extension 2: Event Time Windowing Functions
-------------------------
| wstart | wend | price |
-------------------------
| 8:00 | 8:10 | $11 |
| 8:10 | 8:20 | $10 |
-------------------------
8:21> SELECT MAX(wstart), wend, SUM(price)
FROM
Tumble (
data => TABLE(Bid),
timecol => DESCRIPTOR(bidtime),
dur => INTERVAL '10 ' MINUTES)
GROUP BY wend;
Controlling the �Materialization of Time-Varying Results
Control How and When to Materialize a TVR
SQL Extension 3: Stream Materialization
SQL Extension 3: Stream Materialization
8:08> SELECT ... EMIT STREAM;
----------------------------------------------
| wstart | wend | price | undo | ptime | ver |
----------------------------------------------
| 8:00 | 8:10 | $2 | | 8:08 | 0 |
| 8:10 | 8:20 | $3 | | 8:12 | 0 |
| 8:00 | 8:10 | $2 | undo | 8:13 | 1 |
| 8:00 | 8:10 | $6 | | 8:13 | 2 |
| 8:00 | 8:10 | $6 | undo | 8:15 | 3 |
| 8:00 | 8:10 | $11 | | 8:15 | 4 |
| 8:10 | 8:20 | $3 | undo | 8:17 | 1 |
| 8:10 | 8:20 | $4 | | 8:17 | 2 |
| 8:10 | 8:20 | $4 | undo | 8:18 | 3 |
| 8:10 | 8:20 | $10 | | 8:18 | 4 |
...
8:08 INSERT (8:07, $2, A)�8:12 INSERT (8:11, $3, B)�8:13 INSERT (8:05, $4, C)�8:15 INSERT (8:09, $5, D)�8:17 INSERT (8:13, $1, E)�8:18 INSERT (8:17, $6, F)
8:20> SELECT ...;�-------------------------
| wstart | wend | price |
-------------------------
| 8:00 | 8:10 | $11 |
| 8:10 | 8:20 | $10 |
-------------------------
SQL Extension 4: Delay for Completeness
8:15> SELECT ... EMIT AFTER WATERMARK;�-------------------------
| wstart | wend | price |
-------------------------
-------------------------
8:17> SELECT ... EMIT AFTER WATERMARK;�-------------------------
| wstart | wend | price |
-------------------------�| 8:00 | 8:10 | $11 |
-------------------------
8:07 WM → 8:05
8:08 INSERT (8:07, $2, A)
8:12 INSERT (8:11, $3, B)
8:13 INSERT (8:05, $4, C)
8:14 WM → 8:08
8:15 INSERT (8:09, $5, D)
8:16 WM → 8:12
8:17 INSERT (8:13, $1, E)
8:18 INSERT (8:17, $6, F)
8:21 WM → 8:20
SQL Extension 4: Delay for Completeness
8:07 WM → 8:05
8:08 INSERT (8:07, $2, A)
8:12 INSERT (8:11, $3, B)
8:13 INSERT (8:05, $4, C)
8:14 WM → 8:08
8:15 INSERT (8:09, $5, D)
8:16 WM → 8:12
8:17 INSERT (8:13, $1, E)
8:18 INSERT (8:17, $6, F)
8:21 WM → 8:20
8:08> SELECT ... EMIT STREAM AFTER WATERMARK;
----------------------------------------------
| wstart | wend | price | undo | ptime | ver |
----------------------------------------------
| 8:00 | 8:10 | $11 | | 8:16 | 0 |
| 8:10 | 8:20 | $10 | | 8:21 | 0 |
...
SQL Extension 5: Periodic Delays
SQL Extension 5: Periodic Delays
8:08> SELECT ... EMIT STREAM
AFTER DELAY INTERVAL '6' MINUTES;
----------------------------------------------
| wstart | wend | price | undo | ptime | ver |
----------------------------------------------
| 8:00 | 8:10 | $6 | | 8:14 | 0 |
| 8:10 | 8:20 | $10 | | 8:18 | 0 |
| 8:00 | 8:10 | $6 | undo | 8:21 | 1 |
| 8:00 | 8:10 | $11 | | 8:21 | 2 |
...
8:08 INSERT (8:07, $2, A)�8:12 INSERT (8:11, $3, B)�8:13 INSERT (8:05, $4, C)�8:15 INSERT (8:09, $5, D)�8:17 INSERT (8:13, $1, E)�8:18 INSERT (8:17, $6, F)
8:21> SELECT ...;�-------------------------
| wstart | wend | price |
-------------------------
| 8:00 | 8:10 | $11 |
| 8:10 | 8:20 | $10 |
-------------------------
8:15> SELECT ...;�-------------------------
| wstart | wend | price |
-------------------------
| 8:00 | 8:10 | $6 |
-------------------------
8:19> SELECT ...;�-------------------------
| wstart | wend | price |
-------------------------
| 8:00 | 8:10 | $6 |
| 8:10 | 8:20 | $10 |
-------------------------
Streaming SQL Recap
← DDL for watermarks, etc.�← TUMBLE, HOP, etc. via table-valued functions��← EMIT STREAM�← EMIT AFTER WATERMARK�← EMIT AFTER DELAY
SQL: things to explore
But to do all that, we need a clearer understanding of what we’re doing...
4
Formal Semantics
Because guesses and intuition are a perilous foundation
Formal Semantics: why does it matter?
Formal Semantics: where are we now?
(mostly kidding)
Formal Semantics: where are we now?
Formal Semantics: things to explore
On the subject of tradeoffs, let’s talk about the perpetual tensions between...
5
Latency ↔ Cost ↔ Correctness
Because you can’t have everything all at once (yet ;-)
Latency ↔ Cost ↔ Correctness: why does it matter?
Correctness
Low cost
Low latency
1. Fast & Correct
2. Cheap & Correct
3. Fast & Cheap
Latency ↔ Cost ↔ Correctness: where are we?
Latency ↔ Cost ↔ Correctness: things to explore
And on the topic of batch-like approaches...
6
Batch + Streaming Interoperability
Because we each have a lot to learn from one another
Batch + Streaming Interoperability: why does it matter?
Batch + Streaming Interoperability: where are we?
Batch + Streaming Interoperability: things to explore
Speaking of manual work, why are we largely ignoring decades of research into...
7
Database-style optimizations
Because, let’s face it, so much of what we currently do was �invented by the database community decades ago anyway
Database-style optimizations: why does it matter?
Database-style optimizations: where are we?
Database-style optimizations: things to explore
And with that, we’re done.
In summary
1�2�3�4�5�6�7
Graceful evolution�Operational ease of use�SQL�Formal semantics�Latency ↔ Cost ↔ Correctness�Batch + Streaming Interoperability�Database-style optimizations
Edge Computing / IoT�Streaming graph processing�Streaming/online ML�Streaming & Microservices�Streaming & Serverless Functions�Online algorithms�….
And really, there’s so much more...
Thank you!
Questions?
Complaints? ;-)
http://s.apache.org/open-problems-stream-processing