NYC Meetup
September 17, 2024
New York, NY
Andrew Lamb, Staff Engineer, InfluxData
| © Copyright 2024, InfluxData
1
Andrew Lamb
Staff Engineer InfluxData
> 20 21 😱years in enterprise software development
Oracle: Database (2 years)
DataPower: XSLT compiler (2 years)
Vertica: DB / Query Optimizer (6 years)
Nutonian/DataRobot: ML Startups (7 years)
InfluxData: IOx, Arrow, DataFusion (4 years)
Apache DataFusion PMC (Chair)
Apache Arrow PMC (past Chair)
| © Copyright 2024, InfluxData
2
Agenda
| © Copyright 2024, InfluxData
3
Frame of Reference
DataFusion is not
A product that we* are providing
DataFusion is
A project we are all working on together
* we == Apache / the committers / myself
Plug: come join us! It is great fun and very welcoming!
| © Copyright 2024, InfluxData
4
My Personal / Professional Goal
1,000+ projects!
| © Copyright 2024, InfluxData
5
“Community over Code” - The Apache Way
Non profit governance of open source communities
| © Copyright 2024, InfluxData
6
Apache: Benefits for DataFusion
| © Copyright 2024, InfluxData
7
DataFusion: Cutting Edge Tech
| © Copyright 2024, InfluxData
8
DataFusion: Cutting Edge Tech
Who’s who of DataFusion users
| © Copyright 2024, InfluxData
9
Agenda
| © Copyright 2024, InfluxData
10
InfluxDB 3.0 Architecture
| © Copyright 2024, InfluxData
11
Arrow DataFusion: Benefits for InfluxDB
⇒ Fast full featured, extensible query engine that we didn’t build
All the OLAP buzzwords: vectorized, columnar, multi-core, streaming, out of core, …
“Full” SQL: JOINs, date/time/timestamp functions, structured data, …
Customizable: Easily extend time series specific features (e.g. date_bin gapfill, InfluxQL, insert time resolution…)
| © Copyright 2024, InfluxData
12
How InfluxDB 3.0 uses DataFusion
Rationale
All queries and write path (parquet creation + compaction) use DataFusion + Arrow
| © Copyright 2024, InfluxData
13
Query Processing in InfluxDB 3.0
Storage gRPC Frontend
SQL Frontend
Optimization
(storage pruning, pushdown, etc)
Physical Planning
Execution
gRPC output
Arrow Flight
Query Input
Client / Language Specific Frontends
Shared Planning, Execution
Phases, based on DataFusion
Output RecordBatches
Client Specific
Output formats
read_group(..)
SELECT …
FROM …
DataFusion
LogicalPlan
Arrow
Record Batches
Reorg Frontend
compact_plan(..)
ParquetWriter
SeriesFrame
...
FlightData
Write to Parquet files
DataFusion
LogicalPlan
DataFusion
ExecutionPlan
InfluxQL Frontend
SELECT … FROM … GROUP BY *
| © Copyright 2024, InfluxData
14
DataFusion extensions used in InfluxDB 3.0
SQL
Front Ends
DataFrame
LogicalPlan
ExecutionPlan
Plan Representations and Rewrites
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Execution Engine
Join
Catalog and
Data Sources
Parquet
CSV
…
Extension
Catalog / Table
Extension
Frontend
Extension
LogicalPlan Rewrite
Extension ExecutionPlan Rewrite
Extension
Stream
Extension Node
Extension Node
Streams
✅
✅
✅
✅
✅
✅
✅
| © Copyright 2024, InfluxData
15
What Extension Points does InfluxDB 3.0 use?
| © Copyright 2024, InfluxData
16
Ingester: Persist
DataFusion use: Sort and Deduplicate and write Parquet
Object Store
…
…
Ingester
Buffer
Scan
Sort
Dedupe
DataFusion Plan
1. Buffered data fed to plan
…
parquet-rs
writer
2. Plan output fed to parquet writer*
3. Parquet written to object store
* In the progress of migrating to use DataFusion parquet writer (to get parallelized encoding)
| © Copyright 2024, InfluxData
17
Querier
DataFusion used throughout
Query Planner
Querier
SELECT … WHERE
time > now() - ‘1 minute’ AND
region = ‘us-east-1’
User SQL query
FlightSQL handler
(gRPC)
Metadata Cache
DataFusion
Catalog API
Remote Catalog Service
Ingester
…
Object Store
Data Cache
FlightSQL
1. FlightSQL query received
3. Execution Plan runs
2. Query planned using IOx catalog + ingester
DataFusion Plan
…
4. Results returned to user
| © Copyright 2024, InfluxData
18
Querier: Partition + File Pruning
DataFusion use: PruningPredicate for partition + file pruning
Query Planner
Querier
SELECT … WHERE
time > now() - ‘1 minute’ AND
region = ‘us-east-1’
User SQL query
Remote Catalog Service
Object Store
DataFusion
TableProvider API
Metadata Cache
Partition: 2024-06-01
002312.parquet
a773ef.parquet
599e92.parquet
Partition: 2024-06-02
0356af.parquet
54229a.parquet
B432f3.parquet
…
Partition: 2024-06-30
432790.parquet
4321aa.parquet
time > 123456789
AND region = ‘us-east-1’
2. Evaluate Pruning- Predicate with min/max
1. SQL planned, predicates pushed down
3. Only relevant partitions / files used for plan
| © Copyright 2024, InfluxData
19
Querier: Custom SQL Operator
cpu.user | time |
89.3 | 2000-05-05T12:00:00Z |
88.0 | 2000-05-05T12:05:01Z |
85.0 | 2000-05-05T12:15:00Z |
45.3 | 2000-05-05T12:34:00Z |
... | ... |
78.9 | 2000-05-05T12:58:00Z |
78.7 | 2000-05-05T12:59:00Z |
Note: No data points for 12:20:00-12:29:00
cpu.user | time |
88.6 | 2000-05-05T12:00:00Z |
85.0 | 2000-05-05T12:10:01Z |
??? | 2000-05-05T12:20:00Z |
45.3 | 2000-05-05T12:30:00Z |
55.0 | 2000-05-05T12:40:00Z |
78.8 | 2000-05-05T12:50:00Z |
SQL?
GROUP BY date_bin would have no row with this timestamp
| © Copyright 2024, InfluxData
20
Querier: Custom SQL Operator
SELECT
date_bin_gapfill(
interval '10 minute', time
) as minute,
avg(cpu.user)
from cpu
where
time between
timestamp '2000-05-05T12:00:00Z' and
timestamp '2000-05-05T12:59:00Z'
group by minute;
| © Copyright 2024, InfluxData
21
Querier: Custom SQL Operator
SELECT
date_bin_gapfill(
interval '10 minute', time
) as minute,
avg(cpu.user)
from cpu
where
time between
timestamp '2000-05-05T12:00:00Z' and
timestamp '2000-05-05T12:59:00Z'
group by minute;
Filter
(time < '2000-05-05T12:00:00Z' and
time < '2000-05-05T12:59:00Z'
GroupBy
group: date_bin_gapfill(...)
agg: avg(cpu.user)
Scan
(projection + filter not shown)
Standard GroupBy can’t do gapfilling
| © Copyright 2024, InfluxData
22
Querier: Custom SQL Operator
Filter
(time < '2000-05-05T12:00:00Z' and
time < '2000-05-05T12:59:00Z'
GroupBy
group: minute
agg: date_bin_gapfill(...)
Scan
(projection + filter not shown)
Filter
(time < '2000-05-05T12:00:00Z' and
time < '2000-05-05T12:59:00Z'
GroupBy
group: date_bin as minute
agg: avg(...)
Scan
(projection + filter not shown)
GapFill
group: minute
agg: locf(avg(..))
1. OptimizerRule rewrites to use GapFill operator (extension)
Finalged expressions
| © Copyright 2024, InfluxData
23
Querier: Custom SQL Operator
ProjectionExec: expr=[minute, COUNT(cpu.user)] |
GapFillExec: group_expr=[date_bin_gapfill(..)],
aggr_expr=[COUNT(cpu.user)],
stride=10 minutes,
time_range=[957528000000000000).."957531540000000000"]
AggregateExec: mode=FinalPartitioned,
gby=[date_bin_gapfill(...)),cpu.time)],
aggr=[COUNT(cpu.user)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([date_bin_gapfill(..., 4)
AggregateExec: mode=Partial,
gby=[date_bin_gapfill(...)),cpu.time)],
aggr=[COUNT(cpu.user)]
ParquetExec: file_groups=000000000000.parquet,predicates..)
Custom Stream (“operator)
| © Copyright 2024, InfluxData
24
Querier: Custom InfluxQL frontend
SELECT
MEAN(usage_idle),
MEAN(bytes_free)
FROM cpu, disk
GROUP BY TIME(10s)
FILL(linear)
Not a cross join!
Gap filling + interpolation
| © Copyright 2024, InfluxData
25
Querier: Custom InfluxQL frontend
SELECT
MEAN(usage_idle),
MEAN(bytes_free)
FROM cpu, disk
GROUP BY TIME(10s)
FILL(linear)
InfluxQL Parser
(InfluxDB)
InfluxQL Planner
(InfluxDB)
1. InfluxQL Text
4. Optimized, Executed and Run like all other plans
3. Planned into a DataFusion LogicalPlan
Filter
(time < '2000-05-05T12:00:00Z' and
time < '2000-05-05T12:59:00Z'
GroupBy
group: date_bin as time
agg: avg(..), avg(..)
Scan
(projection + filter not shown)
GapFill
group: minute
agg: avg(..), avg(..)
LogicalPlan
2. Parsed into a custom AST
SELECT
AGG
AGG
Column
Column
FROM
Table
…
usage_free
usage_idle
cpu
Table
disk
| © Copyright 2024, InfluxData
26
Querier: time series specific optimization
InfluxDB uses sortedness heavily, so often ends up with multi-column merges
Very wasteful for queries l
SELECT … FROM …
ORDER BY time DESC
LIMIT 10
SortPreservingMerge
expr: [tag1, tag2, time]
Input Stream
Input Stream
Input Stream
…
…
Each input stream ~ a partition, parquet files + data from ingester
More details in Blog: Making Most Recent Value Queries Hundreds of Times Faster
Not a cross join!
Have to read first batch from each stream before producing any output, even if limit
| © Copyright 2024, InfluxData
27
Querier: time series specific optimization
SortPreservingMerge
expr: [tag1, tag2, time]
ProgressiveEval
expr: [tag1, tag2, time]
Input Stream
Input Stream
Input Stream
…
…
* Really starts two in parallel to minimize IO stalls
2. Reads one stream at a time*
1. PhysicalOptimizerRule replaces SortPreservingMerge with ProgressiveEval if non overlapping key ranges
| © Copyright 2024, InfluxData
28
Conclusion
| © Copyright 2024, InfluxData
29