Building InfluxDB IOx
“from scratch”
With Apache Arrow, DataFusion, and Rust
2023-06-02: THE DUTCH SEMINAR ON DATA SYSTEMS DESIGN
Andrew Lamb
Today: Apache Arrow PMC Chair, Staff Engineer InfluxData
Past life 1: Query Optimizer @ Vertica, Oracle DB server
Past life 2: Chief Architect + VP Engineering at ML startups
Goals
Convince you that building a database with Apache Arrow, Parquet, DataFusion Arrow Flight is a good idea.
Why specialized Time Series Databases?
“One Size Fits All”: An Idea Whose Time Has Come and Gone
Stonebraker & Çetintemel
Why specialized Time Series Databases?
Specialized for storing data with times (obviously)
Key Properties:
Examples: InfluxDB (open), Facebook Gorilla (closed), Google Monarch (closed), Timescale, AWS Timestream, Graphite Whisper / Grafana (open), …
Time Series DB Architecture: Classic
Time Series DB Architecture: Next
Fast / Feature Rich
Cloud Native
Ecosystem Compatibility
It is hard (expensive) to build a new databases
Database company money raised
Source: https://www.crunchbase.com
I did this at Vertica too
“We can do it with the Apache Arrow Ecosystem”
Toolkit for a modern analytic system (OLAP database)?
match tool_needed {
File format (persistence) => Parquet
Columnar memory representation => Arrow Arrays
Operations (e.g. multiply, avg) => Compute Kernels
SQL + extensible query engine => Arrow DataFusion
Network transfer => Arrow Flight IPC
JDBC/ODBC driver => Arrow Flight SQL
}
And the Ecosystem Integration is great
Apache Parquet
Arrow Flight
Arrow Arrays
Apache Parquet
Columnar file format from 2013 (originally part of Hadoop ecosystem)
“Defacto” interchange format for analytics in 2023
Very good compression for a wide variety of data
Large ecosystem of tools and systems
Support for many query acceleration “tricks” (projection and filter pushdown) – See blog Querying Parquet with Millisecond Latency for more details
Apache Parquet: Stores Tabular Data
location |
"us-east" |
"us-midwest" |
"us-west" |
"us-east" |
"us-midwest" |
"us-west" |
"us-east" |
"us-midwest" |
"us-west" |
temperature |
82 |
82 |
70 |
83 |
87 |
72 |
84 |
90 |
71 |
humidity |
67 |
65 |
54 |
69 |
78 |
56 |
67 |
82 |
57 |
timestamp |
2016-06-13T17:43:50.1004002Z |
2016-06-13T17:43:50.1004002Z |
2016-06-13T17:43:50.1004002Z |
2016-06-13T17:43:50.2004002Z |
2016-06-13T17:43:50.2004002Z |
2016-06-13T17:43:50.2004002Z |
2016-06-13T17:43:50.3004002Z |
2016-06-13T17:43:50.3004002Z |
2016-06-13T17:43:50.3004002Z |
Apache Parquet: Stores Structured Data
Stores structured data (like JSON) using Dremel inspired Record Shredding
{ # <-- First record
"a": 1, # <-- the top level fields are a, b, c, and d
"b": { # <-- b is always provided (not nullable)
"b1": 1, # <-- b1 and b2 are "nested" fields of "b"
"b2": 3 # <-- b2 is always provided (not nullable)
},
"d": {
"d1": 1 # <-- d1 is a "nested" field of "d"
}
}
{ # <-- Second record
"a": 2,
"b": {
"b2": 4 # <-- note "b1" is NULL in this record
},
"c": { # <-- note "c" was NULL in the first record
"c1": 6 but when "c" is provided, c1 is also
}, always provided (not nullable)
"d": {
"d1": 2,
"d2": 1
}
}
Parquet File Layout
Parquet Metadata Layout
Parquet Projection Pushdown
Parquet Filter Pushdown
Querying Parquet with Millisecond Latency blog describes other forms of filter pushdown that are possible
How does IOx use Parquet?
All data is persisted to object store as parquet; Queries are answered primarily from parquet files, combined with newest data from ingesters
Object Store
Ingester
2. Periodically writes data buffer as sorted parquet files to object store
Ingester
…
…
…
write path
read path
Querier
4. Reads parquet data from object store and answers queries
SELECT … FROM ...
3. User query
1. Incoming line protocol
weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200
weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200
“A language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware”
Apache Arrow Terminology
Array: “a sequence of values with known length all having the same type”
123.213 |
3141.12 |
214321.87 |
432343.66 |
334.33 |
7337.41 |
4342.84 |
Array of 64 bit floating point values
RecordBatch: Same size arrays, known schema (types and names)
123.213 |
3141.12 |
214321.87 |
432343.66 |
334.33 |
7337.41 |
4342.84 |
1 |
3 |
1 |
3 |
NULL |
2 |
4 |
Quantity Int32(nullable) |
Price Float64(non null) |
RecordBatch with two fields “Quantity” and “Price”
Arrow Implementations and Bindings
23
Java
C++
JavaScript
Go
Rust
C
Ruby
Python
Julia
R
C#
Matlab
Compute Kernels
let output = gt(
&left,
&right
);
+
10
20
17
5
23
5
9
12
4
5
76
2
3
5
2
33
2
1
6
7
8
2
7
2
5
6
7
8
left
right
output
1
0
1
1
1
0
1
1
0
1
1
0
0
0
>
>
>
>
The gt (greater than) kernel computes an output BooleanArray where each element is left > right
Kernels handle nulls (validity masks), optimizations for different data sizes, etc.
~50 different kernels, full list: docs.rs page
How does IOx use Arrow: Ingester Buffers
Indirectly via DataFusion and Flight. Ingest path parses input line protocol directly Arrow memory buffers
Ingester
Object Store
…
…
4. If queried prior to writing parquet buffer is snapshotted, turned into RecordBatch sent to querier via ArrowFlight
querier
2. LP is parsed and appended to an in memory buffer (mutable batch)
open buffer
3. Periodically writes sorted data on object store using datafusion plan
Snapshots
(Record Batches)
1. Incoming line protocol
weather,location=us-east temperature=82,humidity=67 1465839830100400200�weather,location=us-midwest temperature=82,humidity=65 1465839830100400200�weather,location=us-west temperature=70,humidity=54 1465839830100400200�weather,location=us-east temperature=83,humidity=69 1465839830200400200�weather,location=us-midwest temperature=87,humidity=78 1465839830200400200�weather,location=us-west temperature=72,humidity=56 1465839830200400200�weather,location=us-east temperature=84,humidity=67 1465839830300400200�weather,location=us-midwest temperature=90,humidity=82 1465839830400400200�weather,location=us-west temperature=71,humidity=57 1465839830400400200
🤔
History
We can do so much better through modern systems techniques
Multi-core algorithms, GPU acceleration,
Code generation (LLVM)
Lazy evaluation, “query” optimization
Sophisticated memory management,
Efficient access to huge
data sets
Interoperable memory models, zero-copy interchange between system components
Note 1
Moore’s Law (and small data) enabled us to get by for a long time without confronting some of these challenges
Note 2
Most of these methods have already been widely employed in analytic databases. Limited “novel” research needed
NYC R Conference 2018-04-20
Defragmenting Data Access
Defragmenting Data Access
DataFusion
DataFusion: System Input / Output
32
Data Batches
SQL Query
SELECT status, COUNT(1)
FROM http_api_requests_total
WHERE path = '/api/v2/write'
GROUP BY status;
Data Batches
DataFrame
ctx.read_table("http")?
.filter(...)?
.aggregate(..)?;
Catalog information:
tables, schemas, etc
DataFusion: System Architecture
SQL
Query FrontEnds
DataFrame
LogicalPlans
ExecutionPlan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Optimized Execution Operators
(Arrow Based)
Join
Data Sources
Parquet
CSV
…
DataFusion
DataFusion: Totally Customizable
SQL
Query FrontEnds
DataFrame
LogicalPlans
ExecutionPlan
Plan Representations
(DataFlow Graphs)
Expression Eval
Optimizations / Transformations
Optimizations / Transformations
HashAggregate
Sort
…
Join
Data Sources
Parquet
CSV
DataFusion
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Extend ✅
Optimized Execution Operators
(Arrow Based)
How IOx uses DataFusion
Why?
All queries and parquet creation + compaction run through a unified planning system based on DataFusion + Arrow
Query Processing in IOx
Storage gRPC Frontend
SQL Frontend
(from DataFusion)
Optimization
(storage pruning, pushdown, etc)
Physical Planning
Execution
gRPC output
Arrow Flight
Query Input
Client / Language Specific Frontends
Shared Planning, Execution
Phases, based on DataFusion
Output RecordBatches
Client Specific
Output formats
read_group(..)
SELECT …
FROM …
DataFusion
LogicalPlan
Arrow
Record Batches
Reorg Frontend
compact_plan(..)
ParquetWriter
SeriesFrame
...
FlightData
Write to Parquet files
DataFusion
LogicalPlan
DataFusion
ExecutionPlan
InfluxQL Frontend
(in progress)
SELECT … FROM … GROUP BY *
Why Arrow Internally (and not just at interface)?
Option1: Use Arrow Internally (DataFusion, pola.rs, Acero)
Pros: Fast interchange, reuse Arrow libraries
Cons: Constrained(*) to Arrow
Option 2: Use specialized structures internally, convert to Arrow at edges (Velox, DuckDB)
Pros: Can use specialized structures
Cons: Maintain specialized code
Scan
Filter
Agg
Scan
Filter
?
Agg
?
?
Convert to arrow
Why Arrow Internally (and not just at interface)?
So far results are encouraging
Theory: Using Arrow is “good enough” compared to specialized structures
Pooled open source development → invest heavily in optimized Arrow kernels
Good: Sorting, Filtering, Projection, Parquet
Could Improve: Grouping, Joining
Flight
Arrow Flight: Sending Arrow Arrays over the Network
Flight for Distributed Systems
Flight
Flight Canonical Sequence Diagram
How does IOx use Flight? Native Query Protocol
Querier
write path
read path
SELECT … FROM ...
1. User query via Flight
Ingester
2. Querier combines data from ingesters and parquet into response
3. Response via Flight
How does IOx use Flight? Ingester ←→ Querier comms
Ingester
Querier
SELECT … FROM ...
write path
read path
1. User query
2. The querier fetches unpersisted (not yet written to parquet) data returned from the ingester, as RecordBatches via Flight
Native Query Protocol Details
* Slight abuse of the Flight protocol which normally gets an opaque Ticket via GetFlightInfo endpoint
.───────.
╔═══════════╗ ( )
║ ║ │`───────'│
║ Client ║ │ IOx │
║ ║ │.───────.│
║ ║ ( )
╚═══════════╝ `───────'
┃ Client ┃
1 ┃ creates ┃
┃ a Ticket ┃
┃ ┃
2 ┃ DoGet(Ticket) ┃
┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
┃ ┃
┃ Stream of FightData ┃
┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃ 3
FlightSQL
Apache Arrow FlightSQL
Read More:Expanding Arrow's Reach with a JDBC Driver for Arrow Flight SQL
Before Flight SQL
Server
Database specific protocol (e.g. Postgres FEBE)
Caveat: This picture only shows JDBC and ODBC drivers. Also common is a native implementation in each language ecosystem - e.g like the Python DB API 20 and psycopg2 etc. See Apache Arrow Database Connectivity (ADBC) for more details
client
JDBC
Driver
JDBC Interface
client
JDBC
Driver
JDBC Interface
client
ODBC
Driver
ODBC Interface
Scary (likely infeasible) possibility
client
IOx
JDBC Interface
1. Data is produced column by column in RecordBatches
and sent over network using Flight
Custom JDBC Driver
2. Custom JDBC driver converts to the Flight RecordBatches
IOx JDBC connectivity before Flight SQL
client
IOx
Client: Prepare query:"SELECT * FROM foo WHERE a = $1;" name:""
Client: Bind name:"" parameters:[{format:"text", value:"42"}]
… (steps elided) …
Server: RowDescription fields:[{name:"a", type:"int", format:"text"}, …]
Server: DataRow fields:[{data:"42"}, {data: "Hunter Valley"} …]
Server: DataRow fields:[{data:"12"}, {data: "Merrimack Valley"} …]
… (lots ros for each data)
Server: DataRow fields:[{data:"321"}, {data: "Charles River Watershed"} …]
… (steps elided) …
JDBC Interface
3. Data is sent row by row
Postgres
JDBC
Driver
DataRow {..}
DataRow {..}
DataRow {..}
DataRow {..}
…
1. Data is produced column by column in RecordBatches
PG febe adapter
2. Adapter converts to the postgres FEBE protocol
How IOx uses FlightSQL
✅ Access to the ecosystem without having to implement JDBC, ODBC, …
client
arrow-flight JDBC
Driver
IOx
Data is sent via Arrow FlightSQL
(columnar)
arrow-flight JDBC driver implements JDBC API
JDBC Interface
Community
“Community over Code” - The Apache Way
Apache Software Foundation
Independence: the ASF is strictly vendor neutral
Community Over Code: Healthy community higher priority than good code
Open Communications: Everything is done in the open
Earned Authority / Community of Peers: Anyone can contribute and become committers and PMC members.
Open source power and community
Some Industry Contributors in Apache Arrow
ClearCode
Thank you!
Questions / Discussion