1 of 34

Apache Arrow DataFusion

Overview + Architecture

March / April 2023

2 of 34

Today: IOx Team at InfluxData

Past life 1: Query Optimizer @ Vertica, also on Oracle DB server

Past life 2: Chief Architect + VP Engineering roles at some ML startups

3 of 34

Part 2: Logical Planning

🛠️

4 of 34

How does DataFusion work?

5 of 34

What makes up a Query Engine?

  1. Frontend
    1. Query Language Parser + Semantic Checker
    2. Planner: Parse Tree → Logical Plan
  2. Intermediate Query Representation (“Logical Plan”)
    • Expression / Type system
    • Query Plan w/ Relational Operators (Data Flow Graph)
    • Rewrites / Optimizations
  3. Low Level Query Representation (“Physical Plan”)
    • Statistics, partitions, sort orders, algorithms (Hash Join vs Merge Join)
    • Rewrites / Optimizations
  4. Execution Runtime: “Operators”
    • Allocate resources (CPU, Memory, etc),
    • Pushes bytes around, vectorized calculations

🤔

6 of 34

DataFusion is a Query Engine!

SQLStatement

1. Frontend

LogicalPlan

Expr

ExecutionPlan

PhysicalExpr

2. LogicalPlan

3. Physical Plan

4. Operators

e.x GroupedHashAggregateStream

7 of 34

DataFusion Planning Flow

SQL Query

SELECT status, COUNT(1)

FROM http_api_requests_total

WHERE path = '/api/v2/write'

GROUP BY status;

Parsing +

SQL Planning

Physical Planning

RecordBatches

Execution

LogicalPlan

“Query Plan”

PG: “Query Tree”

ExecutionPlan

“Access Plan”

“Operator Tree”

PG: “Plan Tree”

8 of 34

Array + Record Batches + Schema

400

200

500

* StringArray representation is somewhat misleading as it actually has a fixed length portion and the character data in different locations

StringArray

100

StringArray

73621

338304

42

UInt64Array

3

UInt64Array

Schema:

fields[0]: “status”, Utf8

fields[1]: “COUNT()”, UInt64

RecordBatch

cols:

schema:

RecordBatch

cols:

schema:

status

COUNT()

400

73621

200

338304

400

42

100

3

9 of 34

Example Query

SELECT status, COUNT(1)

FROM http_api_requests_total

WHERE path = '/api/v2/write'

GROUP BY status;

status

path

request_size

200

/api/v2/write

3324

200

/api/v2/write

43423

400

/api/v2/read

45

200

/api/v2/write

6433

400

/api/v2/read

45

400

/api/v2/write

6433

http_api_requests_total

10 of 34

Example Query

SELECT status, COUNT(1)

FROM http_api_requests_total

WHERE path = '/api/v2/write'

GROUP BY status;

status

path

request_size

200

/api/v2/write

3324

200

/api/v2/write

43423

400

/api/v2/read

45

200

/api/v2/write

6433

400

/api/v2/read

45

400

/api/v2/write

6433

status

COUNT(1)

200

3

400

1

http_api_requests_total

11 of 34

LogicalPlans are DataFlow graphs

Filter:

#path Eq Utf8("/api/v2/write")

Step 2: Predicate is applied

TableScan: http_api_requests_total

projection=None

Step 1: Parquet file is read

Aggregate:

groupBy=[[#status]],

aggr=[[COUNT(UInt8(1))]]

Step 3: Data is aggregated

Data flows up from the leaves (bottom) to the root (top) of the tree

12 of 34

Query Planning

13 of 34

SQL → LogicalPlan

SQL Parser

SQL Query

SELECT status, COUNT(1)

FROM http_api_requests_total

WHERE path = '/api/v2/write'

GROUP BY status;

Query {

ctes: [],

body: Select(

Select {

distinct: false,

top: None,

projection: [

UnnamedExpr(

Identifier(

Ident {

value: "status",

quote_style: None,

},

),

),

...

Parsed

Statement

(SQL AST)

SQL

Planner

LogicalPlan

14 of 34

“DataFrame” → Logical Plan

Rust Code

let df = ctx

.read_table("http_api_requests_total")?

.filter(col("path").eq(lit("/api/v2/write")))?

.aggregate([col("status")]), [count(lit(1))])?;

DataFrame

(Builder)

LogicalPlan

15 of 34

Sample of LogicalPlan operators (source link)

Projection

Filter

Window

Aggregate

Sort

Join

TableScan

Values

Explain

Analyze

SetVariable

Prepare

Dml(..)

CreateExternalTable

CreateView

CreateCatalogSchema

CreateCatalog

DropTable

DropView

Repartition

Union

Subquery

Limit

Extension

Distinct

16 of 34

Logical Query Optimization

Compute the same (correct) result, only faster

Optimizer

Pass 1

LogicalPlan

(intermediate)

“Optimizer”

Optimizer

Pass 2

LogicalPlan

(input)

LogicalPlan

(output)

Other

Passes

...

17 of 34

Built in DataFusion Optimizer Passes (source link)

Pushdown: Reduce columns/rows passed from node to node:

=> PushDownProjection => PushDownLimit

=> PushDownFilter

Simplify: Minimize expression evaluation during runtime

=> SimplifyExpressions => UnwrapCastInComparison

Simplify: Remove unnecessary nodes

=> MergeProjection => EliminateFilter => EliminateProjection

=> EliminateCrossJoin => EliminateLimit => EliminateDuplicatedExpr

=> CommonSubexprEliminate => PropagateEmptyRelation

18 of 34

Built in DataFusion Optimizer Passes (source link)

Flatten Subqueries: Rewrite subqueries to Joins:

=> DecorrelateWhereExists => DecorrelateWhereIn

=> ScalarSubqueryToJoin

Optimize Joins: Identify join Predicates

=> ExtractEquijoinPredicate => RewriteDisjunctivePredicate

=> FilterNullJoinKeys => EliminateOuterJoin

Optimize DISINCT:

=> SingleDistinctToGroupBy

=> ReplaceDistinctWithAggregate

19 of 34

Optimizer in Action

Use EXPLAIN VERBOSE to see optimizations applied

> EXPLAIN VERBOSE SELECT status, COUNT(1) FROM http_api_requests_total

WHERE path = '/api/v2/write' GROUP BY status;

+----------------------+----------------------------------------------------------------+

| plan_type | plan |

+----------------------+----------------------------------------------------------------+

| logical_plan | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |

| | Selection: #path Eq Utf8("/api/v2/write") |

| | TableScan: http_api_requests_total projection=None |

| projection_push_down | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |

| | Selection: #path Eq Utf8("/api/v2/write") |

| | TableScan: http_api_requests_total projection=Some([6, 8]) |

| type_coercion | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]] |

| | Selection: #path Eq Utf8("/api/v2/write") |

| | TableScan: http_api_requests_total projection=Some([6, 8]) |

...

+----------------------+----------------------------------------------------------------+

Optimizer “pushed” down projection so only status and path columns from file were read from parquet

20 of 34

Expression Evaluation

21 of 34

Expression Trees

path = '/api/v2/write'

Column

path

Literal

ScalarValue::Utf8

'/api/v2/write'

BinaryExpr

op: Eq

left

right

22 of 34

Type Coercion

sqrt(col)

sqrt(col) → sqrt(CAST col as Float32)

col is Int8, but sqrt implemented for Float32 or Float64

⇒ Type Coercion: adds typecast cast so the implementation can be called

Note: Coercion is lossless; if col was Float64, would not coerce to Float32

Source Code: coercion.rs

Column

col

FuncExpr

fn: sqrt

Column

col

FuncExpr

fn: sqrt

Cast

dt: Float32

23 of 34

Expression Evaluation

Arrow Compute Kernels typically operate on 1 or 2 arrays and/or scalars.

Partial list of included comparison kernels:

eq Perform left == right operation on two arrays.

eq_scalar Perform left == right operation on an array and a scalar value.

eq_utf8 Perform left == right operation on StringArray / LargeStringArray.

eq_utf8_scalar Perform left == right operation on StringArray / LargeStringArray and a scalar.

and Performs AND operation on two arrays. If either left or right value is null then the result is also null.

is_not_null Returns a non-null BooleanArray with whether each value of the array is not null.

or Performs OR operation on two arrays. If either left or right value is null then the result is also null.

...

Source link to DataFusion dispatch: binary.rs

24 of 34

Exprs for evaluating arbitrary expressions

path = '/api/v2/write' OR path IS NULL

Column

path

Literal

ScalarValue::Utf8

'/api/v2/write'

Column

path

IsNull

BinaryExpr

op: Eq

left

right

BinaryExpr

op: Or

left

right

col(“path”)

.eq(lit(‘api/v2/write’))

.or(col(“path”).is_null())

“Fluent” Expression API

25 of 34

Expr Vectorized Evaluation

Column

path

Literal

ScalarValue::Utf8

'/api/v2/write'

Column

path

IsNull

BinaryExpr

op: Eq

BinaryExpr

op: Or

26 of 34

Expr Vectorized Evaluation

Literal

ScalarValue::Utf8

'/api/v2/write'

Column

path

IsNull

BinaryExpr

op: Eq

BinaryExpr

op: Or

/api/v2/write

/api/v1/write

/api/v2/read

/api/v2/write

/api/v2/write

/foo/bar

StringArray

27 of 34

Expr Vectorized Evaluation

Column

path

IsNull

BinaryExpr

op: Eq

BinaryExpr

op: Or

/api/v2/write

/api/v1/write

/api/v2/read

/api/v2/write

/api/v2/write

/foo/bar

StringArray

ScalarValue::Utf8(

Some(

“/api/v2/write”

)

)

28 of 34

Expr Vectorized Evaluation

Column

path

IsNull

BinaryExpr

op: Eq

BinaryExpr

op: Or

/api/v2/write

/api/v1/write

/api/v2/read

/api/v2/write

/api/v2/write

/foo/bar

StringArray

ScalarValue::Utf8(

Some(

“/api/v2/write”

)

)

29 of 34

Expr Vectorized Evaluation

Column

path

IsNull

BinaryExpr

op: Or

True

False

False

True

True

False

BooleanArray

30 of 34

Expr Vectorized Evaluation

IsNull

BinaryExpr

op: Or

True

False

False

True

True

False

BooleanArray

/api/v2/write

/api/v1/write

/api/v2/read

/api/v2/write

/api/v2/write

/foo/bar

StringArray

31 of 34

Expr Vectorized Evaluation

IsNull

BinaryExpr

op: Or

True

False

False

True

True

False

BooleanArray

/api/v2/write

/api/v1/write

/api/v2/read

/api/v2/write

/api/v2/write

/foo/bar

StringArray

Call: is_null

32 of 34

Expr Vectorized Evaluation

BinaryExpr

op: Or

True

False

False

True

True

False

BooleanArray

False

False

False

False

False

False

BooleanArray

33 of 34

Expr Vectorized Evaluation

BinaryExpr

op: Or

True

False

False

True

True

False

BooleanArray

False

False

False

False

False

False

BooleanArray

Call: or

34 of 34

Expr Vectorized Evaluation

True

False

False

True

True

False

BooleanArray