1 of 56

Building InfluxDB IOx

“from scratch”

With Apache Arrow, DataFusion, and Rust

2023-06-02: THE DUTCH SEMINAR ON DATA SYSTEMS DESIGN

Andrew Lamb

2 of 56

Today: Apache Arrow PMC Chair, Staff Engineer InfluxData

Past life 1: Query Optimizer @ Vertica, Oracle DB server

Past life 2: Chief Architect + VP Engineering at ML startups

3 of 56

Goals

Convince you that building a database with Apache Arrow, Parquet, DataFusion Arrow Flight is a good idea.

  • Review each of those technologies
  • How they are used in InfluxDB IOx (a new Time Series Database)

4 of 56

Why specialized Time Series Databases?

“One Size Fits All”: An Idea Whose Time Has Come and Gone

Stonebraker & Çetintemel

5 of 56

Why specialized Time Series Databases?

Specialized for storing data with times (obviously)

Key Properties:

  1. Schema on Write: new columns can appear at any time, backfills
  2. High volume, denormalized ingest
    1. eg. host=myhost123.example.com repeated over and over
  3. Rapid data value decay: newest data is super important, falls off drastically

Examples: InfluxDB (open), Facebook Gorilla (closed), Google Monarch (closed), Timescale, AWS Timestream, Graphite Whisper / Grafana (open), …

6 of 56

Time Series DB Architecture: Classic

  • Log Structured Merge (LSM) tree + Series Index
    • Primary Key: Tags + timestamps→ compressed time series
  • Implemented in: Golang (not sure why)
  • Custom query languages
    • InfluxQL, Flux, PromQL, LogQL, etc. )
  • Custom file formats, specialized for time series
    • TSM, TSFile format, etc

7 of 56

Time Series DB Architecture: Next

Fast / Feature Rich

  • State of the Art OLAP query engine, native language (C/C++ or Rust)
  • No series cardinality limits, ‘infinite’ retention, scale out

Cloud Native

  • Tiered Storage: historical data on cheap object store, hot local disk / memory
  • “Disaggregated Compute”: Split compute/storage to run in k8s
  • Scalable: Multiple services, scale up/down: ingest/ query / compaction

Ecosystem Compatibility

  • File Format: open, widely supported (but still works well for timeseries)
  • Query Language: SQL as well as domain specific QL
  • Client Compatibility like JDBC/ODBC

8 of 56

It is hard (expensive) to build a new databases

Database company money raised

  • Snowflake: $2B
  • Databricks (Spark) $3.5B
  • MongoDB: $311M
  • SingleStore: $464.1M
  • CockroachLabs (CockroachDB): $633.1M
  • Pingcap (TiDB): $341.6M
  • Elastic: $162M
  • TimescaleDB $181M
  • DuckDB $? / MotherDuck: $47.5M

I did this at Vertica too

9 of 56

“We can do it with the Apache Arrow Ecosystem”

10 of 56

Toolkit for a modern analytic system (OLAP database)?

match tool_needed {

File format (persistence) => Parquet

Columnar memory representation => Arrow Arrays

Operations (e.g. multiply, avg) => Compute Kernels

SQL + extensible query engine => Arrow DataFusion

Network transfer => Arrow Flight IPC

JDBC/ODBC driver => Arrow Flight SQL

}

11 of 56

And the Ecosystem Integration is great

Apache Parquet

  • AWS Athena
  • ClickHouse
  • Snowflake
  • Vertica
  • DuckDb
  • Greenplum
  • DataBricks

Arrow Flight

  • Google BigQuery
  • Snowflake
  • Dremio
  • DataBricks
  • ...

Arrow Arrays

  • Dremio
  • DuckDb
  • InfluxDB IOx
  • TileDB
  • SciDB
  • pyarrow
  • pola.rs

12 of 56

13 of 56

Apache Parquet

Columnar file format from 2013 (originally part of Hadoop ecosystem)

“Defacto” interchange format for analytics in 2023

Very good compression for a wide variety of data

Large ecosystem of tools and systems

Support for many query acceleration “tricks” (projection and filter pushdown) – See blog Querying Parquet with Millisecond Latency for more details

14 of 56

Apache Parquet: Stores Tabular Data

location

"us-east"

"us-midwest"

"us-west"

"us-east"

"us-midwest"

"us-west"

"us-east"

"us-midwest"

"us-west"

temperature

82

82

70

83

87

72

84

90

71

humidity

67

65

54

69

78

56

67

82

57

timestamp

2016-06-13T17:43:50.1004002Z

2016-06-13T17:43:50.1004002Z

2016-06-13T17:43:50.1004002Z

2016-06-13T17:43:50.2004002Z

2016-06-13T17:43:50.2004002Z

2016-06-13T17:43:50.2004002Z

2016-06-13T17:43:50.3004002Z

2016-06-13T17:43:50.3004002Z

2016-06-13T17:43:50.3004002Z

15 of 56

Apache Parquet: Stores Structured Data

Stores structured data (like JSON) using Dremel inspired Record Shredding

{ # <-- First record

"a": 1, # <-- the top level fields are a, b, c, and d

"b": { # <-- b is always provided (not nullable)

"b1": 1, # <-- b1 and b2 are "nested" fields of "b"

"b2": 3 # <-- b2 is always provided (not nullable)

},

"d": {

"d1": 1 # <-- d1 is a "nested" field of "d"

}

}

{ # <-- Second record

"a": 2,

"b": {

"b2": 4 # <-- note "b1" is NULL in this record

},

"c": { # <-- note "c" was NULL in the first record

"c1": 6 but when "c" is provided, c1 is also

}, always provided (not nullable)

"d": {

"d1": 2,

"d2": 1

}

}

16 of 56

Parquet File Layout

17 of 56

Parquet Metadata Layout

18 of 56

Parquet Projection Pushdown

19 of 56

Parquet Filter Pushdown

Querying Parquet with Millisecond Latency blog describes other forms of filter pushdown that are possible

20 of 56

How does IOx use Parquet?

All data is persisted to object store as parquet; Queries are answered primarily from parquet files, combined with newest data from ingesters

Object Store

Ingester

2. Periodically writes data buffer as sorted parquet files to object store

Ingester

write path

read path

Querier

4. Reads parquet data from object store and answers queries

SELECT … FROM ...

3. User query

1. Incoming line protocol

weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200

weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200

21 of 56

“A language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware”

22 of 56

Apache Arrow Terminology

Array: “a sequence of values with known length all having the same type”

123.213

3141.12

214321.87

432343.66

334.33

7337.41

4342.84

Array of 64 bit floating point values

RecordBatch: Same size arrays, known schema (types and names)

123.213

3141.12

214321.87

432343.66

334.33

7337.41

4342.84

1

3

1

3

NULL

2

4

Quantity Int32(nullable)

Price

Float64(non null)

RecordBatch with two fields “Quantity” and “Price”

23 of 56

Arrow Implementations and Bindings

23

Java

C++

JavaScript

Go

Rust

C

Ruby

Python

Julia

R

C#

Matlab

24 of 56

Compute Kernels

let output = gt(

&left,

&right

);

+

10

20

17

5

23

5

9

12

4

5

76

2

3

5

2

33

2

1

6

7

8

2

7

2

5

6

7

8

left

right

output

1

0

1

1

1

0

1

1

0

1

1

0

0

0

>

>

>

>

The gt (greater than) kernel computes an output BooleanArray where each element is left > right

Kernels handle nulls (validity masks), optimizations for different data sizes, etc.

~50 different kernels, full list: docs.rs page

25 of 56

How does IOx use Arrow: Ingester Buffers

Indirectly via DataFusion and Flight. Ingest path parses input line protocol directly Arrow memory buffers

Ingester

Object Store

4. If queried prior to writing parquet buffer is snapshotted, turned into RecordBatch sent to querier via ArrowFlight

querier

2. LP is parsed and appended to an in memory buffer (mutable batch)

open buffer

3. Periodically writes sorted data on object store using datafusion plan

Snapshots

(Record Batches)

1. Incoming line protocol

weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200

26 of 56

  • “Build a better open source foundation for data science”
  • Begun in 2016, founded by Wes McKinney
  • Initially focused on applying systems / database research to data science ecosystems (pandas)

🤔

History

27 of 56

We can do so much better through modern systems techniques

Multi-core algorithms, GPU acceleration,

Code generation (LLVM)

Lazy evaluation, “query” optimization

Sophisticated memory management,

Efficient access to huge

data sets

Interoperable memory models, zero-copy interchange between system components

Note 1

Moore’s Law (and small data) enabled us to get by for a long time without confronting some of these challenges

Note 2

Most of these methods have already been widely employed in analytic databases. Limited “novel” research needed

NYC R Conference 2018-04-20

28 of 56

Defragmenting Data Access

29 of 56

Defragmenting Data Access

30 of 56

31 of 56

DataFusion

  • Deconstructable “Query Engine” written in Rust, uses Arrow as its in memory format
  • SQL and Dataframe API, Query optimizers, High performance Execution Engine, Data Sources: CSV, Parquet, JSON, Avro, etc
  • Uses:
    • Directly as embedded SQL engine
    • Customized as a foundation for building new systems
    • Examples: Specialized Analytical Database, query language engines such as prql-query, Accelerators such as VegaFusion, research platforms, tools for reading / sorting / transcoding Parquet, CSV, AVRO, and JSON files such as qv, faster Spark runtimes and more

32 of 56

DataFusion: System Input / Output

32

Data Batches

SQL Query

SELECT status, COUNT(1)

FROM http_api_requests_total

WHERE path = '/api/v2/write'

GROUP BY status;

Data Batches

DataFrame

ctx.read_table("http")?

.filter(...)?

.aggregate(..)?;

Catalog information:

tables, schemas, etc

33 of 56

DataFusion: System Architecture

SQL

Query FrontEnds

DataFrame

LogicalPlans

ExecutionPlan

Plan Representations

(DataFlow Graphs)

Expression Eval

Optimizations / Transformations

Optimizations / Transformations

HashAggregate

Sort

Optimized Execution Operators

(Arrow Based)

Join

Data Sources

Parquet

CSV

DataFusion

34 of 56

DataFusion: Totally Customizable

SQL

Query FrontEnds

DataFrame

LogicalPlans

ExecutionPlan

Plan Representations

(DataFlow Graphs)

Expression Eval

Optimizations / Transformations

Optimizations / Transformations

HashAggregate

Sort

Join

Data Sources

Parquet

CSV

DataFusion

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Extend ✅

Optimized Execution Operators

(Arrow Based)

35 of 56

How IOx uses DataFusion

Why?

  1. Query and reorganization are “just” moving data around
  2. Reuse existing execution machinery (e.g. streaming, segregated worker pool, optimized sort / merge, etc)
  3. Amplify investment in Open Source contributions

All queries and parquet creation + compaction run through a unified planning system based on DataFusion + Arrow

36 of 56

Query Processing in IOx

Storage gRPC Frontend

SQL Frontend

(from DataFusion)

Optimization

(storage pruning, pushdown, etc)

Physical Planning

Execution

gRPC output

Arrow Flight

Query Input

Client / Language Specific Frontends

Shared Planning, Execution

Phases, based on DataFusion

Output RecordBatches

Client Specific

Output formats

read_group(..)

SELECT …

FROM …

DataFusion

LogicalPlan

Arrow

Record Batches

Reorg Frontend

compact_plan(..)

ParquetWriter

SeriesFrame

...

FlightData

Write to Parquet files

DataFusion

LogicalPlan

DataFusion

ExecutionPlan

InfluxQL Frontend

(in progress)

SELECT … FROM … GROUP BY *

37 of 56

Why Arrow Internally (and not just at interface)?

Option1: Use Arrow Internally (DataFusion, pola.rs, Acero)

Pros: Fast interchange, reuse Arrow libraries

Cons: Constrained(*) to Arrow

Option 2: Use specialized structures internally, convert to Arrow at edges (Velox, DuckDB)

Pros: Can use specialized structures

Cons: Maintain specialized code

Scan

Filter

Agg

Scan

Filter

?

Agg

?

?

Convert to arrow

38 of 56

Why Arrow Internally (and not just at interface)?

So far results are encouraging

Theory: Using Arrow is “good enough” compared to specialized structures

Pooled open source development → invest heavily in optimized Arrow kernels

Good: Sorting, Filtering, Projection, Parquet

Could Improve: Grouping, Joining

39 of 56

Flight

40 of 56

Arrow Flight: Sending Arrow Arrays over the Network

  • gRPC framework for high-performance data services
  • based on Arrow data, “no copy”
  • Supports single client / multiple remote servers

41 of 56

Flight for Distributed Systems

Flight

42 of 56

Flight Canonical Sequence Diagram

43 of 56

How does IOx use Flight? Native Query Protocol

  1. Client ←→ Querier

Querier

write path

read path

SELECT … FROM ...

1. User query via Flight

Ingester

2. Querier combines data from ingesters and parquet into response

3. Response via Flight

44 of 56

How does IOx use Flight? Ingester ←→ Querier comms

  • Ingester and Querier’s internal protocol is built on Flight

Ingester

Querier

SELECT … FROM ...

write path

read path

1. User query

2. The querier fetches unpersisted (not yet written to parquet) data returned from the ingester, as RecordBatches via Flight

45 of 56

Native Query Protocol Details

  1. IOx client directly* creates a Ticket expected by the IOx DoGet endpoint
  2. Calls DoGet with the Ticket it created in step 1 and the server responds with the requested data

* Slight abuse of the Flight protocol which normally gets an opaque Ticket via GetFlightInfo endpoint

.───────.

╔═══════════╗ ( )

║ ║ │`───────'│

║ Client ║ │ IOx │

║ ║ │.───────.│

║ ║ ( )

╚═══════════╝ `───────'

┃ Client ┃

1 ┃ creates ┃

┃ a Ticket ┃

┃ ┃

2 ┃ DoGet(Ticket) ┃

┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃

┃ ┃

┃ Stream of FightData ┃

┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃ 3

46 of 56

FlightSQL

47 of 56

Apache Arrow FlightSQL

  • Replaces old row-oriented standards for interacting with SQL databases
  • Uses the Arrow in-memory format and Arrow Flight, 10x times faster
  • Existing protocols: optimized for OnLine Transaction Processing (OLTP) – small numbers of rows, prepared statements, low latency
  • Arrow FlightSQL: Optimized for on OnLine Analytical Processing (OLAP) – large numbers of rows, millions of records over the wire

Read More:Expanding Arrow's Reach with a JDBC Driver for Arrow Flight SQL

48 of 56

Before Flight SQL

Server

Database specific protocol (e.g. Postgres FEBE)

Caveat: This picture only shows JDBC and ODBC drivers. Also common is a native implementation in each language ecosystem - e.g like the Python DB API 20 and psycopg2 etc. See Apache Arrow Database Connectivity (ADBC) for more details

client

JDBC

Driver

JDBC Interface

client

JDBC

Driver

JDBC Interface

client

ODBC

Driver

ODBC Interface

49 of 56

Scary (likely infeasible) possibility

  • JDBC Drivers are notoriously complex to implement (e.g. Simba SDK)
  • “expensive to create and maintain”

client

IOx

JDBC Interface

1. Data is produced column by column in RecordBatches

and sent over network using Flight

Custom JDBC Driver

2. Custom JDBC driver converts to the Flight RecordBatches

50 of 56

IOx JDBC connectivity before Flight SQL

client

IOx

Client: Prepare query:"SELECT * FROM foo WHERE a = $1;" name:""

Client: Bind name:"" parameters:[{format:"text", value:"42"}]

… (steps elided) …

Server: RowDescription fields:[{name:"a", type:"int", format:"text"}, …]

Server: DataRow fields:[{data:"42"}, {data: "Hunter Valley"} …]

Server: DataRow fields:[{data:"12"}, {data: "Merrimack Valley"} …]

… (lots ros for each data)

Server: DataRow fields:[{data:"321"}, {data: "Charles River Watershed"} …]

… (steps elided) …

JDBC Interface

3. Data is sent row by row

Postgres

JDBC

Driver

DataRow {..}

DataRow {..}

DataRow {..}

DataRow {..}

1. Data is produced column by column in RecordBatches

PG febe adapter

2. Adapter converts to the postgres FEBE protocol

51 of 56

How IOx uses FlightSQL

✅ Access to the ecosystem without having to implement JDBC, ODBC, …

client

arrow-flight JDBC

Driver

IOx

Data is sent via Arrow FlightSQL

(columnar)

arrow-flight JDBC driver implements JDBC API

JDBC Interface

52 of 56

Community

“Community over Code” - The Apache Way

53 of 56

Apache Software Foundation

Independence: the ASF is strictly vendor neutral

Community Over Code: Healthy community higher priority than good code

  • Strong communities can always rectify problems with their code
  • Unhealthy community will likely struggle to sustainably maintain a codebase

Open Communications: Everything is done in the open

Earned Authority / Community of Peers: Anyone can contribute and become committers and PMC members.

54 of 56

Open source power and community

55 of 56

Some Industry Contributors in Apache Arrow

ClearCode

56 of 56

Thank you!

Questions / Discussion