DataFusion and Apache Arrow
Supercharge Your Data Analytical Tool with a Rusty Query Engine
Andrew Lamb
Staff Engineer, InfluxData
Apache Arrow PMC
Daniël Heres
Data Engineer, GoDataDriven
Apache Arrow PMC
Introduction
Your Speakers
Andrew
Daniël
Staff Engineer @ InfluxData
Previously
Data/ML Engineer @ GoDataDriven
Previously
2
Why should you care?
3
Andrew
Recent Proliferation of Big Data systems
4
…
Recent Proliferation of Databases
5
DB
6
What is going on?
COTS → Totally Custom
“Buy and Operate”
“Build and Operate”
“Assemble and Operate”
7
IT
FANG
✓
Current Trend
Apache Arrow
Multi-language toolkit for Processing and Interchange
Founded in 2016
Apache Software Foundation
Low level / foundational technology to build fast and interoperable analytic systems
Open standard, implementations in 12+ languages
Adopted widely in industry products and open source projects
8
DataFusion: A Query Engine
“DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.”
9
DataFusion: A Query Engine
10
SQL Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
Data Batches
DataFrame
ctx.read_table("http")?
.filter(...)?
.aggregate(..)?;
Data Batches
Catalog information:
tables, schemas, etc
Implementation timeline for a new Database system
11
Client
API
In memory
storage
In-Memory
filter + aggregation
Durability / persistence
Metadata Catalog +
Management
Query
Language
Parser
Optimized /
Compressed storage
Execution on
Compressed
Data
Joins!
Additional Client
Languages
Outer Joins
Subquery support
More advanced analytics
Cost based optimizer
Out of core algorithms
Storage Rearrangement
Heuristic Query Planner
Arithmetic expressions
Date / time Expressions
Concurrency
Control
Data Model /
Type System
Distributed query execution
Resource
Management
“Lets Build
a Database”
🤔
“Ok now this is pretty good”
😐
“Look mom!
I have a database!”
😃
Online recovery
Window functions
12
But for Databases
🤔
LLVM-like Infrastructure for Databases
13
Inputs
Logical Plan
Execution Plan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Optimized Execution Operators
(Arrow Based)
Join
Data
(Parquet, CSV, statistics, …)
DataFusion
Query
(SQL, code, DataFrame, …)
Code
(UDF, UDA, etc)
Resources
(Cores, memory, etc)
DataFusion: Totally Customizable
14
Inputs
Logical Plan
Execution Plan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Optimized Execution Operators
(Arrow Based)
Join
Data
(Parquet, CSV, statistics, …)
DataFusion
Query
(SQL, code, DataFrame, …)
Code
(UDF, UDA, etc)
Resources
(Cores, memory, etc)
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
DataFusion Project Growth
15
5.0.0
6.0.0
7.0.0
8.0.0
Number of Unique Contributors
Date
DataFusion Project Growth
16
DataFusion Milestones: Time to Mature
5+ year labor of love
17
Dec 2016 Initial DataFusion Commit By Andy Grove
Feb 2019 Donation to Apache Arrow
Apr/May 2022: Subqueries
Apr 2021 Ballista is donated to Apache Arrow
2020-2021: (hash) joins, window functions, performance & parallelization, etc.
Mar 2018 Arrow in Rust started, DataFusion switches to Arrow
Nov 2021
DataFusion Contrib
Overview of Apache Arrow DataFusion
18
Daniël
From Query to Results
19
ExecutionPlan
LogicalPlan
Optimize
Optimize
Execute!
Query / DataFrame
From Query to Results
an example
select
count(*) num_visitors,
job_title
from
visitors�where
city = "San Francisco"
group by
job_title
1
2
3
4
5
6
7
8
9
10
20
From Query to Results
datafusion package available via PyPI
visitors = ctx.table("visitors")
df = (
visitors.filter(col("city") == literal("San Francisco"))
.aggregate([col("job_title")], [f.count(literal(1))])
)
batches = df.collect() # collect results into memory (Arrow batches)
1
2
3
4
5
6
7
8
9
10
21
From Query to Results
22
ExecutionPlan
Optimize
Optimize
Execute!
Query / DataFrame
LogicalPlan
Logical Plan represents the what
Initial Logical Plan
SQL is parsed, then translated into a initial Logical Plan.
(Read plan from bottom to top)
23
Projection: #COUNT(UInt8(1)) AS num_visitors, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=None
select count(*) num_visitors, job_title
from visitors
where city = "San Francisco"
group by job_title
visitors = ctx.table("visitors")
df = (
visitors.filter(col("city") == literal("San Francisco"))
.aggregate([col("job_title")], [f.count(literal(1))])
)
Let's Optimize!
24
ExecutionPlan
Optimize
Execute!
Query / DataFrame
LogicalPlan
Optimize
Let's Optimize!
Projection Pushdown
Minimizing IO (especially useful for formats like Parquet), processing
25
Projection: #COUNT(UInt8(1)) AS num_visitors, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=None
num_visitors
Let's Optimize!
Projection Pushdown
Minimizing IO (especially useful for formats like Parquet), processing
26
Projection: #COUNT(UInt8(1)) AS num_visitors, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=Some([0, 1])
projection_push_down
Projection: #COUNT(UInt8(1)) AS num_visitors, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=None
num_visitors
Let's Optimize!
Filter Pushdown
Minimizing IO (especially useful for formats like Parquet), processing
27
Projection: #COUNT(UInt8(1)) AS n, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=Some([0, 1])
num_visitors
Let's Optimize!
Filter Pushdown
Minimizing IO (especially useful for formats like Parquet), processing
28
filter_push_down
Projection: #COUNT(UInt8(1)) AS n, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=Some([0, 1]), partial_filters=[#visitors.city = Utf8("San Francisco")]
num_visitors
Projection: #COUNT(UInt8(1)) AS n, #visitors.job_title
Aggregate: groupBy=[[#visitors.job_title]], aggr=[[COUNT(UInt8(1))]]
Filter: #visitors.city = Utf8("San Francisco")
TableScan: visitors projection=Some([0, 1])
Let's Create...
The ExecutionPlan
The Execution Plan represents the where and how
29
Optimize
Execute!
Query / DataFrame
LogicalPlan
ExecutionPlan
Optimize
The Initial Execution Plan
30
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
HashAggregateExec: mode=Partial, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
And... Optimize!
31
Execute!
Query / DataFrame
LogicalPlan
Optimize
Optimize
ExecutionPlan
Optimize
CoalesceBatches: Avoiding small batch size
32
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
HashAggregateExec: mode=Partial, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
Optimize
CoalesceBatches: Avoiding small batch size
33
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
coalesce_batches
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
HashAggregateExec: mode=Partial, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
Optimize
Repartition: Introducing parallelism
34
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
Optimize
Repartition: Introducing parallelism
35
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: city@1 = San Francisco
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
repartition
ProjectionExec: expr=[COUNT(UInt8(1))@1 as number_visitors, job_title@0 as job_title]� HashAggregateExec: mode=FinalPartitioned, gby=[job_title@0 as job_title], aggr=[COUNT(UInt8(1))]� CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "job_title", index: 0 }], 16)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: city@1 = San Francisco
RepartitionExec: partitioning=RoundRobinBatch(16)
CsvExec: files=[./data/visitors.csv], has_header=true, limit=None, projection=[job_title, city]
Getting results
Return record batches (or write results)
36
Query / DataFrame
LogicalPlan
Optimize
ExecutionPlan
Execute!
Optimize
Arrow Batches
DataFusion Features
37
SQL Support
Projection (SELECT), Filtering (WHERE), Ordering (ORDER BY), Aggregation (GROUP BY)
Aggregation functions (COUNT, SUM, MIN, MAX, AVG, APPROX_PERCENTILE, etc)
Window functions (OVER .. ([ORDER BY ...] [PARTITION BY ..])
Set functions: UNION (ALL), INTERSECT (ALL), EXCEPT
Scalar functions: string, Date/time,... (basic)
Joins (INNER, LEFT, RIGHT, FULL OUTER, SEMI, ANTI)
Subqueries, Grouping Sets
38
Extensibility
Customize DataFusion to your needs
User Defined Functions
User Defined Aggregates
User Defined Optimizer passes
User Defined LogicalPlan nodes
User Defined ExecutionPlan nodes
User Defined TableProvider
User Defined FileFormat
User Defined ObjectStore
39
Systems Powered by DataFusion
40
Andrew
FLOCK
41
ROAPI
42
VegaFusion
43
Cube.js / Cube Store
44
InfluxDB IOx
45
Coralogix
46
blaze-rs
47
Ballista Distributed Compute
48
What’s Next?
49
Daniël
Future Directions
50
Come Join Us
We ❤️ Our Contributors
More details:
https://arrow.apache.org/datafusion/community/communication.html
51
Andrew Lamb
Staff Engineer, InfluxData
Apache Arrow PMC
Daniël Heres
Data Engineer, GoDataDriven
Apache Arrow PMC
52
53
Backup Slides
54
Thank You!
Andrew Lamb
Staff Engineer, InfluxData
Apache Arrow PMC
Daniël Heres
Data Engineer, GoDataDriven
Apache Arrow PMC
DataFusion / Arrow / Parquet
Parquet
Arrow
sqlparser-rs
DataFusion
A Virtuous Cycle
Increased Use of Drives Increased Contribution
57
Increased use of open source systems
Increased capacity for maintenance and contribution
DataFusion, and Apache Arrow are key open source technologies for building interoperable open source systems
delta-rs
58
DISCLAIMER: Not yet cleared / verified with project team
Cloudfuse Buzz
59
DISCLAIMER: Not yet cleared / verified with project team
dask-sql
60
DISCLAIMER: Not yet cleared / verified with project team
Apache Arrow Analytics Toolkit
Where does DataFusion fit?
61
Parquet (“Disk”)
Arrow (“Memory”)
Compute Kernels
Arrow Flight
Arrow FlightSQL
DataFusion
Data Formats
Low Level Calculations + Interchange
Runtime Subsystems
IPC
C ABI
Analytics / Database Systems
C++ Query Engine
Analytic systems built using some of this stack
Native Implementations
Language Bindings
Query Engines
What is it and why do you need one?
Maps Desired Computations: SQL and DataFrame (ala Pandas)
To Efficient Calculation: projection pushdown, filter pushdown, joins, expression simplification, parallelization, etc
62
datafusion-python
63
DISCLAIMER: Not yet cleared / verified with project team
Common Themes
Come for the performance, stay for the features (?)
Native execution
Native (non JVM) of Spark/Spark like behavior
SQL interface
Projects are leveraging properties of Rustlang
SQL / DataFrame API
64
Better, Faster, Cheaper
The DataFusion Query Engine is part of the commoditization of advanced analytic database technologies
Transform analytic systems over the next decade
65
Better
Faster
Cheaper
Andrew’s Notes
Proposal: Data + AI Summit talk
Desired Takeaways:
Thesis: DataFusion is part of a larger trend (spearheaded by Apache Arrow) in the commoditization of analytic database technologies, which will lead to many faster / cheaper / better analytic systems over the next decade
Other decks for inspiration:
DataFusion: An Embeddable Query Engine Written in Rust
xA Rusty Introduction to Apache Arrow and how it Applies to a Time Series Database
2021-04-20: Apache Arrow and its Impact on the Database industry.pptx
66
Instructions: Read me!
Getting started with our slide template
When using this template, create your new slides at the very top of the slide order, above this slide. Explore the advice and example slides below to find useful layouts and graphics to pull into your design. When your slide deck is complete, delete this slide and every slide below it.�
67
Presentation best practices
Get creative
Make it scannable
Less is more
Clarity over density
There are great baseline slides in this template, but it may not have everything you need. Don’t be afraid to craft your own layouts! Just pay attention to the font and grid guidelines, and take advantage of starter shapes.
Use text hierarchy to create order and keep your content scannable. No walls of text! Try to keep headlines short.
Don’t try to cram everything onto a limited number of slides. More slides with less text per slide is easier to digest.
68
Font Guidance
Font selection
All text in our slide decks should use one of two available event brand fonts: DM Sans or DM Mono.
If you do not see these fonts in your font selection menu, they can be added by selecting “More fonts” and searching for “dm.” Click on DM Sans and DM Mono, then hit OK.
69
1
2
Font Guidance (Cont.)
Font sizing
Using consistent type sizing is a good way to help your slides feel uniform. When selecting type sizes, try to stick to multiple of 8, with the exceptions of 12 and 20 as in-betweens.
DATA+AI Summit
12
DATA+AI Summit
DATA+AI Summit
DATA+AI Summit
DATA+AI Summit
DATA+AI Summit
DATA+AI Summit
DATA+AI Summ
16
20
24
32
40
56
64
70
Grid Guidance
Keep it orderly
Your presentation template has a 12 column grid to help you organize the elements on your slides. When laying out objects, consider using the grid to help.
Toggle the grid visibility by navigating to View > Guides > Show Guides.
71
Color Guidance
Keep it on brand
When customizing charts or adding other visual elements, do your best to stay within our defined event color palette. This will ensure that all your content looks great together and doesn’t clash with the slide template design.
Always use black text when placing content over a colored background. The only exception is when using a black background. Any color text is acceptable on black.
72
10121E
00B6E0
85DDB5
F16047
EDEEF1
8FDDEF
AFE9CF
F3A89B
Example Slides
73
Choose Your Title Slide
Eighteen colorful title slide options with varying shapes
Add your Name
Add your title, company
74
Choose Your�Title Slide
Eighteen colorful title slide options with varying shapes
Add your Name
Add your title, company
75
Choose Your�Title Slide
Eighteen colorful title slide options with varying shapes
Add your Name
Add your title, company
76
Basic Content Slide
Your all-purpose zone
Use this slide as a starting point for crafting your own layouts, or for simple text slides.
77
Activate Dark Mode
Mix in black slides to add contrast and variety
Or make your whole presentation dark!
78
Insert your charts or images
Take advantage of the content panels
Insert Image by URL
If you want to insert a gif or other image from the web, simply navigate to�Insert > Image > by URL.
Crop and resize your image to fit within content panels, if you’re feeling fancy.
79
Andrew Pons�Slide Designer
“With just a few adjustments to text size and alignment, you can use the basic content slide for other types of content such as quotes.”
80
81
| Column A | Column B | Column C | Column D | Column E | Column F |
Row A | You can create simple tables to help organize information. | | | | | |
Row B | | | | | | |
Row C | | | | | | |
Row D | | | | | | |
Row E | | | | | | |
Row F | | | | | | |
Row G | | | | | | |
Row H | | | | | | |
Timeline Style One
Your subtitle here
82
Timeline Item
Timeline Item
Timeline Item
Timeline Item
Timeline Item
Timeline Item
Timeline Item
Timeline Style Two
Your subtitle here
83
Your gantt chart item
Q1
Q2
Q3
Q4
Your gantt chart item
Your gantt chart item
Your gantt chart item
Your gantt chart item
Your gantt chart item
Your gantt chart item
Your gantt chart item
Single Column
Content Tile
Multi-purpose
Use this panel for content, images, diagrams, or whatever else you want to include. You can use the line tool to divide this panel into multiple sections if you want.
84
Two Column
Content Tile
Multi-purpose
Multi-purpose
Use these slides for comparing two topics or just for splitting your content into multiple pieces.
Use these slides for comparing two topics or just for splitting your content into multiple pieces.
85
Three Column
Column 3
Column 2
Column 1
86
Four Column
Column 1
Column 4
Column 3
Column 2
87
Half Panel
Right aligned
Open Content
Panel Content
This space is great for supporting text that compliments whatever content is inside the panel.
This space can be for text content, images, diagrams, or whatever you need
88
Half Panel
Left aligned
Panel Content
Open Content
This space can be for text content, images, diagrams, or whatever you need
This space is great for supporting text that compliments whatever content is inside the panel.
89
⅔ Panel
Right aligned
Panel Content
Open Content
This space can be for text content, images, diagrams, or whatever you need
This space is great for supporting text that compliments whatever content is inside the panel.
90
⅔ Panel
Left aligned
Panel Content
Open Content
This space can be for text content, images, diagrams, or whatever you need
This space is great for supporting text that compliments whatever content is inside the panel.
91
Code Display
Paste snippets
select
count (*),
age
from
visitors�where location="SanFrancisco"
group by job_title
1
2
3
4
5
6
7
8
9
10
92
Use breaker slides to divide your deck into sections
93
Use breaker slides to divide your deck into sections
94
Starter Shapes
Copy and paste these wherever you need them
95
Floating panel for text or graphics
❤
⚠
★
✓
✗
Medium pill label
SMALL PILL LABEL
✓
✗
Medium pill label
SMALL PILL LABEL
Logos
Partners and cloud platforms
96
Logos
Open source projects
97