Apache Arrow DataFusion
Overview + Architecture
March 2023
Today: IOx Team at InfluxData
Past life 1: Query Optimizer @ Vertica, also on Oracle DB server
Past life 2: Chief Architect + VP Engineering roles at some ML startups
Talk Outline
Part 1:
What is a Query Engine
What do you need one
What is DataFusion
Part 2:
Query Planning + Expression Evaluation
Part 3:
Query Execution
Part 1
🥁
What is a Query Engine?
Motivation
Users who want to access data without writing a program
UIs (visual and textual)
Data is stored somewhere
Query Engine
SQL is a (very) common interface
DBMS vs Query Engine ( , , , , )
Database Management Systems (DBMS) are full featured systems
DataFusion
Query Engine Use Cases
Why do you need DataFusion?
Implementation timeline for a new Database system
10
Client
API
In memory
storage
In-Memory
filter + aggregation
Durability / persistence
Metadata Catalog +
Management
Query
Language
Parser
Optimized /
Compressed storage
Execution on
Compressed
Data
Joins!
Additional Client
Languages
Outer Joins
Subquery support
More advanced analytics
Cost based optimizer
Out of core algorithms
Storage Rearrangement
Heuristic Query Planner
Arithmetic expressions
Date / time Expressions
Concurrency
Control
Data Model /
Type System
Distributed query execution
Resource
Management
“Lets Build
a Database”
🤔
“Ok now this is pretty good”
😐
“Look mom!
I have a database!”
😃
Online recovery
Window functions
Most systems have custom query engines
Almost all well known database systems have their own custom query engines
Trending …
Future of Databases / Analytic system
New systems will be built on high quality open source query engines
Shared high quality, open source vectorized OLAP engine.
Databases focused on differentiating features
What is DataFusion?
What is DataFusion?
“DataFusion is an in-memory query engine that uses Apache Arrow as the memory model” - crates.io
DataFusion Features / Design Goals
High Performance: Memory (no GC) and Performance, leveraging Rust/Arrow
Easy to Connect: Interoperability with other tools via Arrow, Parquet and Flight
Easy to Embed + Customize: Can extend data sources, functions, operators
First Class Rust: High quality Query / SQL Engine entirely in Rust
High Quality: Extensive tests and integration tests with Arrow ecosystems
My goal: DataFusion to be *the* choice for a Query Engine in Rust
DataFusion: System Architecture
SQL
Query FrontEnds
DataFrame
LogicalPlans
ExecutionPlan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Optimized Execution Operators
(Arrow Based)
Join
Data Sources
Parquet
CSV
…
DataFusion
DataFusion: Totally Customizable
SQL
Query FrontEnds
DataFrame
LogicalPlans
ExecutionPlan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Join
Data Sources
Parquet
CSV
DataFusion
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Optimized Execution Operators
(Arrow Based)
DataFusion Extensibility 🧰
* Built in default implementations ⇒ fast to get new systems up and running
DataFusion in Action
datafusion-cli
datafusion-cli
DEMO
datafusion-cli
200+ files in cpu, 14GB file in hits.parquet (ClickBench)
❯ select * from 'cpu' limit 1;
❯ select avg(usage_idle), cpu from 'cpu' group by cpu;
❯ select count(*), "EventDate" from 'hits.parquet' group by "EventDate";
❯ explain select count(*), "EventDate" from 'hits.parquet' group by "EventDate";
DataFusion CLI
+---------------------+-----------------------+
| minute | AVG(cpu.parquet.free) |
+---------------------+-----------------------+
| 2023-02-21T19:59:00 | 3.3361268736e10 |
| 2023-02-21T20:00:00 | 3.3346749098666668e10 |
| 2023-02-21T20:01:00 | 3.3304853845333332e10 |
| 2023-02-21T20:02:00 | 3.3370011648e10 |
| 2023-02-21T20:03:00 | 3.3481375744e10 |
| 2023-02-21T20:04:00 | 3.3496006656e10 |
| 2023-02-21T20:05:00 | 3.3529480533333332e10 |
| 2023-02-21T22:05:00 | 3.3313550336e10 |
| 2023-02-21T22:06:00 | 3.3314910208e10 |
+---------------------+-----------------------+
> SELECT
date_bin('1 minute', time)
as minute,
avg(free)
FROM 'cpu.parquet'
GROUP BY minute
ORDER BY minute;
EXPLAIN Plan - What did it do?
+---------------+------------------------------------------------------------------------------------------------------+
| plan_type | plan
+---------------+------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: minute ASC NULLS LAST
| | Projection: datebin(...) AS minute, AVG(cpu.parquet.free) | | Aggregate: groupBy=[[datebin(...)]], aggr=[[AVG(cpu.parquet.free)]]
| | TableScan: cpu.parquet projection=[free, time] | physical_plan | SortPreservingMergeExec: [minute@0 ASC NULLS LAST] | | SortExec: expr=[minute@0 ASC NULLS LAST] | | ProjectionExec: expr=[datebin(...)@0 as minute, AVG(cpu.parquet.free)@1 as AVG(cpu.parquet.free)] | | AggregateExec: mode=FinalPartitioned, gby=[datebin(...)], aggr=[AVG(cpu.parquet.free)]
| | CoalesceBatchesExec: target_batch_size=8192 | | RepartitionExec: partitioning=Hash([Column { name: "datebin...", 16), input_partitions=16
| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 | | AggregateExec: mode=Partial, gby=[datebin(...)], aggr=[AVG(cpu.parquet.free)] | | ParquetExec: limit=None, partitions={1 group: [[cpu.parquet]]}, projection=[free, time]
+---------------+-------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.003 seconds.
> EXPLAIN
SELECT date_bin('1 minute', time) as minute, avg(free)
FROM 'cpu.parquet' GROUP BY minute ORDER BY minute;
EXPLAIN ANALYZE – includes runtime metrics
> EXPLAIN ANALYZE
SELECT date_bin('1 minute', time, '1970-01-01') as minute, avg(free)
FROM 'cpu.parquet' GROUP BY minute ORDER BY minute;
+-------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
+-------------------+-----------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=9, elapsed_compute=15.947µs, ...]
| | ProjectionExec: expr=[datebin(...), AVG(...), metrics=[output_rows=9, elapsed_compute=9.055µs]
| | AggregateExec: mode=FinalPartitioned, ..., metrics=[output_rows=9, elapsed_compute=304.792µs]
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=9, elapsed_compute=32.21µs] | | RepartitionExec: ...]
| | RepartitionExec: ...
| | AggregateExec: mode=Partial, gby=..., aggr=...,
metrics=[output_rows=9, elapsed_compute=149.461µs, ...]
| | ParquetExec: limit=None, partitions={1 group: [[cpu.parquet]]},
projection=[free, time],
metrics=[output_rows=40, elapsed_compute=1ns,
pushdown_rows_filtered=0, page_index_rows_filtered=0,
bytes_scanned=698, predicate_evaluation_errors=0,
time_elapsed_scanning_total=341.925µs,
time_elapsed_processing=288.69µs]
+-------------------+---------------------------------------------------------------------------------------------------