1 of 29

NYC Meetup

September 17, 2024

New York, NY

Andrew Lamb, Staff Engineer, InfluxData

| © Copyright 2024, InfluxData

1

2 of 29

Andrew Lamb

Staff Engineer InfluxData

> 20 21 😱years in enterprise software development

Oracle: Database (2 years)

DataPower: XSLT compiler (2 years)

Vertica: DB / Query Optimizer (6 years)

Nutonian/DataRobot: ML Startups (7 years)

InfluxData: IOx, Arrow, DataFusion (4 years)

Apache DataFusion PMC (Chair)

Apache Arrow PMC (past Chair)

| © Copyright 2024, InfluxData

2

3 of 29

Agenda

  • Reflections
  • How InfluxDB 3.0 uses DataFusion

| © Copyright 2024, InfluxData

3

4 of 29

Frame of Reference

DataFusion is not

A product that we* are providing

DataFusion is

A project we are all working on together

* we == Apache / the committers / myself

Plug: come join us! It is great fun and very welcoming!

| © Copyright 2024, InfluxData

4

5 of 29

My Personal / Professional Goal

1,000+ projects!

| © Copyright 2024, InfluxData

5

6 of 29

“Community over Code” - The Apache Way

Non profit governance of open source communities

| © Copyright 2024, InfluxData

6

7 of 29

Apache: Benefits for DataFusion

  • ⇒ Predictable Foundation
  • Stable License: (ASL 20 years old) low risk of changes, (ahem OpenTofu)
  • Communication: Predictable and open (if slow)
  • Multi-Vendor Participation: Shared investment reduces individual risk
  • Long Term Maintenance: Hedged against life changes, corporate strategy shifts, VC funding cycles
  • ⭐⭐⭐⭐⭐: Works far better than could be reasonably expected

| © Copyright 2024, InfluxData

7

8 of 29

DataFusion: Cutting Edge Tech

Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine

In SIGMOD 2024

We wrote this paper entirely in the open

| © Copyright 2024, InfluxData

8

9 of 29

DataFusion: Cutting Edge Tech

Who’s who of DataFusion users

| © Copyright 2024, InfluxData

9

10 of 29

Agenda

  • Reflections
  • ⇒ How InfluxDB 3.0 uses DataFusion

| © Copyright 2024, InfluxData

10

11 of 29

InfluxDB 3.0 Architecture

| © Copyright 2024, InfluxData

11

12 of 29

Arrow DataFusion: Benefits for InfluxDB

⇒ Fast full featured, extensible query engine that we didn’t build

All the OLAP buzzwords: vectorized, columnar, multi-core, streaming, out of core, …

“Full” SQL: JOINs, date/time/timestamp functions, structured data, …

Customizable: Easily extend time series specific features (e.g. date_bin gapfill, InfluxQL, insert time resolution…)

| © Copyright 2024, InfluxData

12

13 of 29

How InfluxDB 3.0 uses DataFusion

Rationale

  1. Query and write are “just” moving data around
  2. Reuse existing execution machinery (streaming, segregated worker pool, optimized sort / merge, etc)
  3. Amplify investment in Open Source contributions

All queries and write path (parquet creation + compaction) use DataFusion + Arrow

| © Copyright 2024, InfluxData

13

14 of 29

Query Processing in InfluxDB 3.0

Storage gRPC Frontend

SQL Frontend

Optimization

(storage pruning, pushdown, etc)

Physical Planning

Execution

gRPC output

Arrow Flight

Query Input

Client / Language Specific Frontends

Shared Planning, Execution

Phases, based on DataFusion

Output RecordBatches

Client Specific

Output formats

read_group(..)

SELECT …

FROM

DataFusion

LogicalPlan

Arrow

Record Batches

Reorg Frontend

compact_plan(..)

ParquetWriter

SeriesFrame

...

FlightData

Write to Parquet files

DataFusion

LogicalPlan

DataFusion

ExecutionPlan

InfluxQL Frontend

SELECT … FROM … GROUP BY *

| © Copyright 2024, InfluxData

14

15 of 29

DataFusion extensions used in InfluxDB 3.0

SQL

Front Ends

DataFrame

LogicalPlan

ExecutionPlan

Plan Representations and Rewrites

Expression Eval

Optimizations / Transformations

Optimizations / Transformations

HashAggregate

Sort

Execution Engine

Join

Catalog and

Data Sources

Parquet

CSV

Extension

Catalog / Table

Extension

Frontend

Extension

LogicalPlan Rewrite

Extension ExecutionPlan Rewrite

Extension

Stream

Extension Node

Extension Node

Streams

| © Copyright 2024, InfluxData

15

16 of 29

What Extension Points does InfluxDB 3.0 use?

  • “Almost all of them”
  • Frontend: InfluxQL → LogicalPlan
  • Functions: Aggregate / Window functions
    • `select_first`, `integral`: timeseries specific functions
  • Logical / Physical operators
    • GapFill (implement gap filling / interpolation)
    • ProgressiveEvalExec: optimized for sorted data (see blog)
    • DeduplicateExec: insert order resolution for non compacted data
  • Table Provider: merges parquet and unpersisted data
  • Logical Optimizers: E.g. rewrites for timeseries_gap_fill
  • Physical Optimizers: optimizations based on file layout, filter pushdown, dedup, etc

| © Copyright 2024, InfluxData

16

17 of 29

Ingester: Persist

DataFusion use: Sort and Deduplicate and write Parquet

Object Store

Ingester

Buffer

Scan

Sort

Dedupe

DataFusion Plan

1. Buffered data fed to plan

parquet-rs

writer

2. Plan output fed to parquet writer*

3. Parquet written to object store

* In the progress of migrating to use DataFusion parquet writer (to get parallelized encoding)

| © Copyright 2024, InfluxData

17

18 of 29

Querier

DataFusion used throughout

Query Planner

Querier

SELECTWHERE

time > now() - ‘1 minute’ AND

region = ‘us-east-1’

User SQL query

FlightSQL handler

(gRPC)

Metadata Cache

DataFusion

Catalog API

Remote Catalog Service

Ingester

Object Store

Data Cache

FlightSQL

1. FlightSQL query received

3. Execution Plan runs

2. Query planned using IOx catalog + ingester

DataFusion Plan

4. Results returned to user

| © Copyright 2024, InfluxData

18

19 of 29

Querier: Partition + File Pruning

DataFusion use: PruningPredicate for partition + file pruning

Query Planner

Querier

SELECTWHERE

time > now() - ‘1 minute’ AND

region = ‘us-east-1’

User SQL query

Remote Catalog Service

Object Store

DataFusion

TableProvider API

Metadata Cache

Partition: 2024-06-01

002312.parquet

a773ef.parquet

599e92.parquet

Partition: 2024-06-02

0356af.parquet

54229a.parquet

B432f3.parquet

Partition: 2024-06-30

432790.parquet

4321aa.parquet

time > 123456789

AND region = ‘us-east-1’

2. Evaluate Pruning- Predicate with min/max

1. SQL planned, predicates pushed down

3. Only relevant partitions / files used for plan

| © Copyright 2024, InfluxData

19

20 of 29

Querier: Custom SQL Operator

cpu.user

time

89.3

2000-05-05T12:00:00Z

88.0

2000-05-05T12:05:01Z

85.0

2000-05-05T12:15:00Z

45.3

2000-05-05T12:34:00Z

...

...

78.9

2000-05-05T12:58:00Z

78.7

2000-05-05T12:59:00Z

Note: No data points for 12:20:00-12:29:00

cpu.user

time

88.6

2000-05-05T12:00:00Z

85.0

2000-05-05T12:10:01Z

???

2000-05-05T12:20:00Z

45.3

2000-05-05T12:30:00Z

55.0

2000-05-05T12:40:00Z

78.8

2000-05-05T12:50:00Z

SQL?

GROUP BY date_bin would have no row with this timestamp

| © Copyright 2024, InfluxData

20

21 of 29

Querier: Custom SQL Operator

SELECT

date_bin_gapfill(

interval '10 minute', time

) as minute,

avg(cpu.user)

from cpu

where

time between

timestamp '2000-05-05T12:00:00Z' and

timestamp '2000-05-05T12:59:00Z'

group by minute;

  • Implemented a date_bin_gapfill function modeled after time_bucket_gapfill

  • Implemented with Uses custom DataFusion operator and optimizer pass

| © Copyright 2024, InfluxData

21

22 of 29

Querier: Custom SQL Operator

SELECT

date_bin_gapfill(

interval '10 minute', time

) as minute,

avg(cpu.user)

from cpu

where

time between

timestamp '2000-05-05T12:00:00Z' and

timestamp '2000-05-05T12:59:00Z'

group by minute;

Filter

(time < '2000-05-05T12:00:00Z' and

time < '2000-05-05T12:59:00Z'

GroupBy

group: date_bin_gapfill(...)

agg: avg(cpu.user)

Scan

(projection + filter not shown)

Standard GroupBy can’t do gapfilling

| © Copyright 2024, InfluxData

22

23 of 29

Querier: Custom SQL Operator

Filter

(time < '2000-05-05T12:00:00Z' and

time < '2000-05-05T12:59:00Z'

GroupBy

group: minute

agg: date_bin_gapfill(...)

Scan

(projection + filter not shown)

Filter

(time < '2000-05-05T12:00:00Z' and

time < '2000-05-05T12:59:00Z'

GroupBy

group: date_bin as minute

agg: avg(...)

Scan

(projection + filter not shown)

GapFill

group: minute

agg: locf(avg(..))

1. OptimizerRule rewrites to use GapFill operator (extension)

Finalged expressions

| © Copyright 2024, InfluxData

23

24 of 29

Querier: Custom SQL Operator

ProjectionExec: expr=[minute, COUNT(cpu.user)] |

GapFillExec: group_expr=[date_bin_gapfill(..)],

aggr_expr=[COUNT(cpu.user)],

stride=10 minutes,

time_range=[957528000000000000).."957531540000000000"]

AggregateExec: mode=FinalPartitioned,

gby=[date_bin_gapfill(...)),cpu.time)],

aggr=[COUNT(cpu.user)]

CoalesceBatchesExec: target_batch_size=8192

RepartitionExec: partitioning=Hash([date_bin_gapfill(..., 4)

AggregateExec: mode=Partial,

gby=[date_bin_gapfill(...)),cpu.time)],

aggr=[COUNT(cpu.user)]

ParquetExec: file_groups=000000000000.parquet,predicates..)

Custom Stream (“operator)

| © Copyright 2024, InfluxData

24

25 of 29

Querier: Custom InfluxQL frontend

SELECT

MEAN(usage_idle),

MEAN(bytes_free)

FROM cpu, disk

GROUP BY TIME(10s)

FILL(linear)

  • SQL is kind of a pain to work with time series

  • InfluxDB has a DSL called “InfluxQL” that has SQL like syntax but specialized for time series

Not a cross join!

Gap filling + interpolation

| © Copyright 2024, InfluxData

25

26 of 29

Querier: Custom InfluxQL frontend

SELECT

MEAN(usage_idle),

MEAN(bytes_free)

FROM cpu, disk

GROUP BY TIME(10s)

FILL(linear)

InfluxQL Parser

(InfluxDB)

InfluxQL Planner

(InfluxDB)

1. InfluxQL Text

4. Optimized, Executed and Run like all other plans

3. Planned into a DataFusion LogicalPlan

Filter

(time < '2000-05-05T12:00:00Z' and

time < '2000-05-05T12:59:00Z'

GroupBy

group: date_bin as time

agg: avg(..), avg(..)

Scan

(projection + filter not shown)

GapFill

group: minute

agg: avg(..), avg(..)

LogicalPlan

2. Parsed into a custom AST

SELECT

AGG

AGG

Column

Column

FROM

Table

usage_free

usage_idle

cpu

Table

disk

| © Copyright 2024, InfluxData

26

27 of 29

Querier: time series specific optimization

InfluxDB uses sortedness heavily, so often ends up with multi-column merges

Very wasteful for queries l

SELECTFROM

ORDER BY time DESC

LIMIT 10

SortPreservingMerge

expr: [tag1, tag2, time]

Input Stream

Input Stream

Input Stream

Each input stream ~ a partition, parquet files + data from ingester

Not a cross join!

Have to read first batch from each stream before producing any output, even if limit

| © Copyright 2024, InfluxData

27

28 of 29

Querier: time series specific optimization

SortPreservingMerge

expr: [tag1, tag2, time]

ProgressiveEval

expr: [tag1, tag2, time]

Input Stream

Input Stream

Input Stream

* Really starts two in parallel to minimize IO stalls

2. Reads one stream at a time*

1. PhysicalOptimizerRule replaces SortPreservingMerge with ProgressiveEval if non overlapping key ranges

| © Copyright 2024, InfluxData

28

29 of 29

Conclusion

  • Let’s get 1,000 DataFusion based projects 🚀
  • We implemented InfluxDB 3.0 with DataFusion
  • Thank you all 🙏

| © Copyright 2024, InfluxData

29