1 of 25

Apache Arrow DataFusion

Overview + Architecture

March 2023

2 of 25

Today: IOx Team at InfluxData

Past life 1: Query Optimizer @ Vertica, also on Oracle DB server

Past life 2: Chief Architect + VP Engineering roles at some ML startups

3 of 25

Talk Outline

Part 1:

What is a Query Engine

What do you need one

What is DataFusion

Part 2:

Query Planning + Expression Evaluation

Part 3:

Query Execution

4 of 25

Part 1

🥁

5 of 25

What is a Query Engine?

6 of 25

Motivation

Users who want to access data without writing a program

UIs (visual and textual)

Data is stored somewhere

Query Engine

SQL is a (very) common interface

Many (!!!) others:

InfluxQL

PromQL

LogQL�StreamQL

SparQL

GraphQL

7 of 25

DBMS vs Query Engine ( , , , , )

Database Management Systems (DBMS) are full featured systems

  • Storage system (stores actual data)
  • Catalog (store metadata about what is in the storage system)
  • Query Engine (query, and retrieve requested data)
  • Access Control and Authorization (users, groups, permissions)
  • Resource Management (divide resources between uses)
  • Administration utilities (monitor resource usage, set policies, etc)
  • Clients for Network connectivity (e.g. implement JDBC, ODBC, etc)
  • Multi-node coordination and management

DataFusion

8 of 25

Query Engine Use Cases

  1. Analytical Databases
    1. Examples: InfluxDB IOx / CeresDB, Apache Ballista.
  2. Query Languages
  3. Systems Research Platforms
  4. Data Transcoding (maybe)
    • Reading / sorting / transcoding Parquet, CSV, AVRO, and JSON files such as qv

9 of 25

Why do you need DataFusion?

10 of 25

Implementation timeline for a new Database system

10

Client

API

In memory

storage

In-Memory

filter + aggregation

Durability / persistence

Metadata Catalog +

Management

Query

Language

Parser

Optimized /

Compressed storage

Execution on

Compressed

Data

Joins!

Additional Client

Languages

Outer Joins

Subquery support

More advanced analytics

Cost based optimizer

Out of core algorithms

Storage Rearrangement

Heuristic Query Planner

Arithmetic expressions

Date / time Expressions

Concurrency

Control

Data Model /

Type System

Distributed query execution

Resource

Management

“Lets Build

a Database”

🤔

“Ok now this is pretty good”

😐

“Look mom!

I have a database!”

😃

Online recovery

Window functions

11 of 25

Most systems have custom query engines

Almost all well known database systems have their own custom query engines

  • Expensive to Build (barrier to entry)
  • Expensive to Maintain (slows down innovation)

12 of 25

Trending …

13 of 25

Future of Databases / Analytic system

New systems will be built on high quality open source query engines

Shared high quality, open source vectorized OLAP engine.

Databases focused on differentiating features

14 of 25

What is DataFusion?

15 of 25

What is DataFusion?

“DataFusion is an in-memory query engine that uses Apache Arrow as the memory model” - crates.io

  • Not part of the Arrow spec, but governed by same governance body
  • Initially implemented and donated by Andy Grove; design based on How Query Engines Work

16 of 25

DataFusion Features / Design Goals

High Performance: Memory (no GC) and Performance, leveraging Rust/Arrow

Easy to Connect: Interoperability with other tools via Arrow, Parquet and Flight

Easy to Embed + Customize: Can extend data sources, functions, operators

First Class Rust: High quality Query / SQL Engine entirely in Rust

High Quality: Extensive tests and integration tests with Arrow ecosystems

My goal: DataFusion to be *the* choice for a Query Engine in Rust

17 of 25

DataFusion: System Architecture

SQL

Query FrontEnds

DataFrame

LogicalPlans

ExecutionPlan

Plan Representations

(DataFlow Graphs)

Expression Eval

Optimizations / Transformations

Optimizations / Transformations

HashAggregate

Sort

Optimized Execution Operators

(Arrow Based)

Join

Data Sources

Parquet

CSV

DataFusion

18 of 25

DataFusion: Totally Customizable

SQL

Query FrontEnds

DataFrame

LogicalPlans

ExecutionPlan

Plan Representations

(DataFlow Graphs)

Expression Eval

Optimizations / Transformations

Optimizations / Transformations

HashAggregate

Sort

Join

Data Sources

Parquet

CSV

DataFusion

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Optimized Execution Operators

(Arrow Based)

19 of 25

DataFusion Extensibility 🧰

  • User Defined Functions (e.g. sqrt)
  • User Defined Aggregates (e.g. avg)
  • User Defined Optimizer passes (logical and execution)
  • User Defined LogicalPlan nodes
  • User Defined ExecutionPlan nodes
  • User Defined TableProvider for data sources
  • User Defined CatalogProvider for table definitions

* Built in default implementations ⇒ fast to get new systems up and running

20 of 25

DataFusion in Action

21 of 25

datafusion-cli

  • Very small example SQL CLI (ala sqlite)
  • Part of the DataFusion project
  • Convenient for testing / validating
  • More details in docs (link)

datafusion-cli

22 of 25

DEMO

datafusion-cli

200+ files in cpu, 14GB file in hits.parquet (ClickBench)

❯ select * from 'cpu' limit 1;

❯ select avg(usage_idle), cpu from 'cpu' group by cpu;

❯ select count(*), "EventDate" from 'hits.parquet' group by "EventDate";

❯ explain select count(*), "EventDate" from 'hits.parquet' group by "EventDate";

23 of 25

DataFusion CLI

+---------------------+-----------------------+

| minute | AVG(cpu.parquet.free) |

+---------------------+-----------------------+

| 2023-02-21T19:59:00 | 3.3361268736e10 |

| 2023-02-21T20:00:00 | 3.3346749098666668e10 |

| 2023-02-21T20:01:00 | 3.3304853845333332e10 |

| 2023-02-21T20:02:00 | 3.3370011648e10 |

| 2023-02-21T20:03:00 | 3.3481375744e10 |

| 2023-02-21T20:04:00 | 3.3496006656e10 |

| 2023-02-21T20:05:00 | 3.3529480533333332e10 |

| 2023-02-21T22:05:00 | 3.3313550336e10 |

| 2023-02-21T22:06:00 | 3.3314910208e10 |

+---------------------+-----------------------+

> SELECT

date_bin('1 minute', time)

as minute,

avg(free)

FROM 'cpu.parquet'

GROUP BY minute

ORDER BY minute;

24 of 25

EXPLAIN Plan - What did it do?

+---------------+------------------------------------------------------------------------------------------------------+

| plan_type | plan

+---------------+------------------------------------------------------------------------------------------------------+

| logical_plan | Sort: minute ASC NULLS LAST

| | Projection: datebin(...) AS minute, AVG(cpu.parquet.free) | | Aggregate: groupBy=[[datebin(...)]], aggr=[[AVG(cpu.parquet.free)]]

| | TableScan: cpu.parquet projection=[free, time] | physical_plan | SortPreservingMergeExec: [minute@0 ASC NULLS LAST] | | SortExec: expr=[minute@0 ASC NULLS LAST] | | ProjectionExec: expr=[datebin(...)@0 as minute, AVG(cpu.parquet.free)@1 as AVG(cpu.parquet.free)] | | AggregateExec: mode=FinalPartitioned, gby=[datebin(...)], aggr=[AVG(cpu.parquet.free)]

| | CoalesceBatchesExec: target_batch_size=8192 | | RepartitionExec: partitioning=Hash([Column { name: "datebin...", 16), input_partitions=16

| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 | | AggregateExec: mode=Partial, gby=[datebin(...)], aggr=[AVG(cpu.parquet.free)] | | ParquetExec: limit=None, partitions={1 group: [[cpu.parquet]]}, projection=[free, time]

+---------------+-------------------------------------------------------------------------------------------------------+

2 rows in set. Query took 0.003 seconds.

> EXPLAIN

SELECT date_bin('1 minute', time) as minute, avg(free)

FROM 'cpu.parquet' GROUP BY minute ORDER BY minute;

25 of 25

EXPLAIN ANALYZE – includes runtime metrics

> EXPLAIN ANALYZE

SELECT date_bin('1 minute', time, '1970-01-01') as minute, avg(free)

FROM 'cpu.parquet' GROUP BY minute ORDER BY minute;

+-------------------------------------------------------------------------------------------------------------------+

| plan_type | plan

+-------------------+-----------------------------------------------------------------------------------------------+

| Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=9, elapsed_compute=15.947µs, ...]

| | ProjectionExec: expr=[datebin(...), AVG(...), metrics=[output_rows=9, elapsed_compute=9.055µs]

| | AggregateExec: mode=FinalPartitioned, ..., metrics=[output_rows=9, elapsed_compute=304.792µs]

| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=9, elapsed_compute=32.21µs] | | RepartitionExec: ...]

| | RepartitionExec: ...

| | AggregateExec: mode=Partial, gby=..., aggr=...,

metrics=[output_rows=9, elapsed_compute=149.461µs, ...]

| | ParquetExec: limit=None, partitions={1 group: [[cpu.parquet]]},

projection=[free, time],

metrics=[output_rows=40, elapsed_compute=1ns,

pushdown_rows_filtered=0, page_index_rows_filtered=0,

bytes_scanned=698, predicate_evaluation_errors=0,

time_elapsed_scanning_total=341.925µs,

time_elapsed_processing=288.69µs]

+-------------------+---------------------------------------------------------------------------------------------------