Apache Arrow DataFusion
Overview + Architecture
March / April 2023
Today: IOx Team at InfluxData
Past life 1: Query Optimizer @ Vertica, also on Oracle DB server
Past life 2: Chief Architect + VP Engineering roles at some ML startups
Part 2: Logical Planning
🛠️
How does DataFusion work?
What makes up a Query Engine?
🤔
DataFusion is a Query Engine!
SQLStatement
1. Frontend
LogicalPlan
Expr
ExecutionPlan
PhysicalExpr
2. LogicalPlan
3. Physical Plan
4. Operators
e.x GroupedHashAggregateStream
DataFusion Planning Flow
SQL Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
Parsing +
SQL Planning
Physical Planning
RecordBatches
Execution
LogicalPlan
“Query Plan”
PG: “Query Tree”
ExecutionPlan
“Access Plan”
“Operator Tree”
PG: “Plan Tree”
Array + Record Batches + Schema
400
200
500
* StringArray representation is somewhat misleading as it actually has a fixed length portion and the character data in different locations
StringArray
100
StringArray
73621
338304
42
UInt64Array
3
UInt64Array
Schema:
fields[0]: “status”, Utf8
fields[1]: “COUNT()”, UInt64
RecordBatch
cols:
schema:
RecordBatch
cols:
schema:
status | COUNT() |
400 | 73621 |
200 | 338304 |
400 | 42 |
100 | 3 |
Example Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
status | path | request_size |
200 | /api/v2/write | 3324 |
200 | /api/v2/write | 43423 |
400 | /api/v2/read | 45 |
200 | /api/v2/write | 6433 |
400 | /api/v2/read | 45 |
400 | /api/v2/write | 6433 |
http_api_requests_total
Example Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
status | path | request_size |
200 | /api/v2/write | 3324 |
200 | /api/v2/write | 43423 |
400 | /api/v2/read | 45 |
200 | /api/v2/write | 6433 |
400 | /api/v2/read | 45 |
400 | /api/v2/write | 6433 |
status | COUNT(1) |
200 | 3 |
400 | 1 |
http_api_requests_total
LogicalPlans are DataFlow graphs
Filter:
#path Eq Utf8("/api/v2/write")
Step 2: Predicate is applied
TableScan: http_api_requests_total
projection=None
Step 1: Parquet file is read
Aggregate:
groupBy=[[#status]],
aggr=[[COUNT(UInt8(1))]]
Step 3: Data is aggregated
Data flows up from the leaves (bottom) to the root (top) of the tree
Query Planning
SQL → LogicalPlan
SQL Parser
SQL Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
Query {
ctes: [],
body: Select(
Select {
distinct: false,
top: None,
projection: [
UnnamedExpr(
Identifier(
Ident {
value: "status",
quote_style: None,
},
),
),
...
Parsed
Statement
(SQL AST)
SQL
Planner
LogicalPlan
“DataFrame” → Logical Plan
Rust Code
let df = ctx
.read_table("http_api_requests_total")?
.filter(col("path").eq(lit("/api/v2/write")))?
.aggregate([col("status")]), [count(lit(1))])?;
DataFrame
(Builder)
LogicalPlan
Sample of LogicalPlan operators (source link)
Projection
Filter
Window
Aggregate
Sort
Join
TableScan
Values
Explain
Analyze
SetVariable
Prepare
Dml(..)
CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView
Repartition
Union
Subquery
Limit
Extension
Distinct
Logical Query Optimization
Compute the same (correct) result, only faster
Optimizer
Pass 1
LogicalPlan
(intermediate)
“Optimizer”
Optimizer
Pass 2
LogicalPlan
(input)
LogicalPlan
(output)
…
Other
Passes
...
Built in DataFusion Optimizer Passes (source link)
Pushdown: Reduce columns/rows passed from node to node:
=> PushDownProjection => PushDownLimit
=> PushDownFilter
Simplify: Minimize expression evaluation during runtime
=> SimplifyExpressions => UnwrapCastInComparison
Simplify: Remove unnecessary nodes
=> MergeProjection => EliminateFilter => EliminateProjection
=> EliminateCrossJoin => EliminateLimit => EliminateDuplicatedExpr
=> CommonSubexprEliminate => PropagateEmptyRelation
Built in DataFusion Optimizer Passes (source link)
Flatten Subqueries: Rewrite subqueries to Joins:
=> DecorrelateWhereExists => DecorrelateWhereIn
=> ScalarSubqueryToJoin
Optimize Joins: Identify join Predicates
=> ExtractEquijoinPredicate => RewriteDisjunctivePredicate
=> FilterNullJoinKeys => EliminateOuterJoin
Optimize DISINCT:
=> SingleDistinctToGroupBy
=> ReplaceDistinctWithAggregate
Optimizer in Action
Use EXPLAIN VERBOSE to see optimizations applied
> EXPLAIN VERBOSE SELECT status, COUNT(1) FROM http_api_requests_total
WHERE path = '/api/v2/write' GROUP BY status;
+----------------------+----------------------------------------------------------------+
| plan_type | plan |
+----------------------+----------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |
| | Selection: #path Eq Utf8("/api/v2/write") |
| | TableScan: http_api_requests_total projection=None |
| projection_push_down | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |
| | Selection: #path Eq Utf8("/api/v2/write") |
| | TableScan: http_api_requests_total projection=Some([6, 8]) |
| type_coercion | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |
| | Selection: #path Eq Utf8("/api/v2/write") |
| | TableScan: http_api_requests_total projection=Some([6, 8]) |
...
+----------------------+----------------------------------------------------------------+
Optimizer “pushed” down projection so only status and path columns from file were read from parquet
Expression Evaluation
Expression Trees
path = '/api/v2/write'
Column
path
Literal
ScalarValue::Utf8
'/api/v2/write'
BinaryExpr
op: Eq
left
right
Type Coercion
sqrt(col)
sqrt(col) → sqrt(CAST col as Float32)
col is Int8, but sqrt implemented for Float32 or Float64
⇒ Type Coercion: adds typecast cast so the implementation can be called
Note: Coercion is lossless; if col was Float64, would not coerce to Float32
Source Code: coercion.rs
Column
col
FuncExpr
fn: sqrt
Column
col
FuncExpr
fn: sqrt
Cast
dt: Float32
Expression Evaluation
Arrow Compute Kernels typically operate on 1 or 2 arrays and/or scalars.
Partial list of included comparison kernels:
eq Perform left == right operation on two arrays.
eq_scalar Perform left == right operation on an array and a scalar value.
eq_utf8 Perform left == right operation on StringArray / LargeStringArray.
eq_utf8_scalar Perform left == right operation on StringArray / LargeStringArray and a scalar.
and Performs AND operation on two arrays. If either left or right value is null then the result is also null.
is_not_null Returns a non-null BooleanArray with whether each value of the array is not null.
or Performs OR operation on two arrays. If either left or right value is null then the result is also null.
...
Source link to DataFusion dispatch: binary.rs
Exprs for evaluating arbitrary expressions
path = '/api/v2/write' OR path IS NULL
Column
path
Literal
ScalarValue::Utf8
'/api/v2/write'
Column
path
IsNull
BinaryExpr
op: Eq
left
right
BinaryExpr
op: Or
left
right
col(“path”)
.eq(lit(‘api/v2/write’))
.or(col(“path”).is_null())
“Fluent” Expression API
Expr Vectorized Evaluation
Column
path
Literal
ScalarValue::Utf8
'/api/v2/write'
Column
path
IsNull
BinaryExpr
op: Eq
BinaryExpr
op: Or
Expr Vectorized Evaluation
Literal
ScalarValue::Utf8
'/api/v2/write'
Column
path
IsNull
BinaryExpr
op: Eq
BinaryExpr
op: Or
/api/v2/write
/api/v1/write
/api/v2/read
/api/v2/write
…
/api/v2/write
/foo/bar
StringArray
Expr Vectorized Evaluation
Column
path
IsNull
BinaryExpr
op: Eq
BinaryExpr
op: Or
/api/v2/write
/api/v1/write
/api/v2/read
/api/v2/write
…
/api/v2/write
/foo/bar
StringArray
ScalarValue::Utf8(
Some(
“/api/v2/write”
)
)
Expr Vectorized Evaluation
Column
path
IsNull
BinaryExpr
op: Eq
BinaryExpr
op: Or
/api/v2/write
/api/v1/write
/api/v2/read
/api/v2/write
…
/api/v2/write
/foo/bar
StringArray
ScalarValue::Utf8(
Some(
“/api/v2/write”
)
)
Call: eq_utf8_scalar
Expr Vectorized Evaluation
Column
path
IsNull
BinaryExpr
op: Or
True
False
False
True
…
True
False
BooleanArray
Expr Vectorized Evaluation
IsNull
BinaryExpr
op: Or
True
False
False
True
…
True
False
BooleanArray
/api/v2/write
/api/v1/write
/api/v2/read
/api/v2/write
…
/api/v2/write
/foo/bar
StringArray
Expr Vectorized Evaluation
IsNull
BinaryExpr
op: Or
True
False
False
True
…
True
False
BooleanArray
/api/v2/write
/api/v1/write
/api/v2/read
/api/v2/write
…
/api/v2/write
/foo/bar
StringArray
Call: is_null
Expr Vectorized Evaluation
BinaryExpr
op: Or
True
False
False
True
…
True
False
BooleanArray
False
False
False
False
…
False
False
BooleanArray
Expr Vectorized Evaluation
BinaryExpr
op: Or
True
False
False
True
…
True
False
BooleanArray
False
False
False
False
…
False
False
BooleanArray
Call: or
Expr Vectorized Evaluation
True
False
False
True
…
True
False
BooleanArray