1 of 42

InfluxDB�on Apache Flink

Open Source Data Processing – Data Engineering Systems Group

Ramin Gharib, Felix Seidel, and Leon Papke

Winter Semester 2020/2021

03.03.2021

2 of 42

Agenda

2

Use Case

Connector Design

Benchmarks

OSDP

3 of 42

Use Case

3

4 of 42

Flight Tracking

4

OSDP

5 of 42

Data Point (Example)

5

// Real World Example

adsb_icao,flight=DLH9MU,ground=false tas=442,alt_baro=39000,lat=52.331,lon=12.456 1608125488700000047

OSDP

6 of 42

Data Point (Schema)

6

// Example

adsb,ground=false,scientist=mullen seen_pos=0,alt_geom=6625 1608123758799999952

---- ----------------------------- ------------------------ -------------------

| | | |

Measurement Tag set Field set Timestamp

(Required) (Optional) (Required) (Optional)

  • Tags are indexed but Fields are not indexed.
  • InfluxDB identifies unique data points by their measurement, tag set, and timestamp.
  • A point includes the series key, a field value, and a timestamp.

Data Point

OSDP

7 of 42

Flight Tracking

7

OSDP

8 of 42

Flight Tracking

8

OSDP

9 of 42

Flight Tracking

9

OSDP

10 of 42

Flight Tracking

10

OSDP

11 of 42

Flight Tracking

11

OSDP

12 of 42

Flight Tracking

12

OSDP

13 of 42

Connector�Design

13

14 of 42

High Level Architecture

14

POST /api/v2/write

Content-Type: text/html

Body:

test longValue=1i 1�test longValue=2i 2

test longValue=3i 3

...

Split Reader 0

Source Reader

Task Manager

...

Data Source

OSDP

15 of 42

A Push-Based Design

  • Idea: Flink implements InfluxDB 2.0 write API endpoint
  • On start: one HTTP server initialized per split reader
    • Port is set by user (Default 8000)
  • On request: server parses line protocol to data point objects
    • Default value per request 10000
  • User needs to provide data point deserializer

15

OSDP

16 of 42

User-Defined Deserializer

16

Data point parser

test longValue=1i 1�test longValue=2i 2

test longValue=3i 3

...

Request Body (String)

Deserialize

test longValue=1i 1�test longValue=2i 2

test longValue=3i 3

...

Data Point (Object)

1L, 2L, ...

...

Data Source

OSDP

17 of 42

High Level Architecture

17

Sink Writer

1L, 2L, ...

test longValue=1i

test longValue=2i

...

Sink Committer

Last Timestamp

checkpoint checkpoint=flink <timestamp>

  • Idea: Insert an additional measurement whenever Flink creates a checkpoint
  • Sink Writer: Serialize data, when batch size reached write to InfluxDB
    • Default batch size 1000 elements
  • Sink Committer (Optional): Write checkpoint data point

OSDP

18 of 42

Sink Writer with User-Defined Serializer

18

Serialize incoming data

1L, 2L, ...

Add data point to buffer

test,longValue=1 fieldKey=sink

test,longValue=2 fieldKey=sink

...

Data Point (Object)

Is buffer full?

Yes

Wait for other input

No

OSDP

19 of 42

Exactly-once Delivery Guarantee

  • Duplication detection done by InfluxDB
  • Unique data points identified by their measurement, tag set, and timestamp
  • Preserve duplicate points
    • Add an arbitrary tag
    • Increment the timestamp
  • Can be done in user-defined data point serializer

19

OSDP

20 of 42

Benchmarks

20

21 of 42

Experiment Design

  • 4-node NUMA machine: 9 x 1GHz CPU (+9 virtual) per node
    • Execution on one node via numactl
  • JVM heap limit: 10GB
  • Flink settings:
    • Parallelism = 1
    • Object reuse enabled
    • No watermarking & checkpointing

21

OSDP

22 of 42

Source Benchmarks

Throughput�+ Latency:

Query

Query

Data Generator

Data Generator

HTTP Post

HTTP Post

HTTP Post

Latency:

22

testGenerator, simpleTag=testTag fieldCount=i++ eventTime

...

OSDP

23 of 42

Source Queries

23

OSDP

24 of 42

Event Throughput

24

OSDP

25 of 42

Request Throughput

25

OSDP

26 of 42

Event Latency

26

OSDP

27 of 42

Sink Queries

27

OSDP

28 of 42

Event Throughput

28

OSDP

29 of 42

Event Latency

29

OSDP

30 of 42

Future Work

30

31 of 42

Possible Extensions

  • [Source] Dynamic ports
  • [Source] Kubernetes cluster service discovery
  • [Source] Multi-threaded HTTP-Server

31

OSDP

32 of 42

Q&A

InfluxDB on Apache Flink

Ramin Gharib, Felix Seidel, and Leon Papke

32

33 of 42

Appendix

  1. InfluxDB Editions
  2. Storage Engine
  3. InfluxDB OSS Architecture
  4. Updates in InfluxDB
  5. Duplicates in InfluxDB
  6. Source Design Ideas
  7. Sink Design Ideas
  8. Deserializer Implementation
  9. Serializer Implementation

33

OSDP

34 of 42

InfluxDB Editions

  • InfluxDB Cloud
  • InfluxDB OSS
  • InfluxDB Enterprise

InfluxDB OSS does not support clustering. For high availability or horizontal scaling of InfluxDB, consider the InfluxData commercial clustered offering, InfluxDB Enterprise/Cloud.

34

OSDP

35 of 42

Storage Engine

  • Write Ahead Log
  • Cache
  • Time-Structured Merge Tree
  • Time Series Index

35

OSDP

36 of 42

InfluxDB OSS Architecture (v2)

36

OSDP

37 of 42

Updates in InfluxDB

“Time series data is predominantly new data that is never updated. Deletes generally only affect data that isn’t being written to, and contentious updates never occur.

InfluxDB Design Principles

37

OSDP

38 of 42

Duplicate Data in InfluxDB

“To simplify conflict resolution and increase write performance, InfluxDB assumes data sent multiple times is duplicate data. Identical points aren’t stored twice. If a new field value is submitted for a point, InfluxDB updates the point with the most recent field value. In rare circumstances, data may be overwritten.”

InfluxDB, Design Principles.

38

OSDP

39 of 42

Initial Source Design Ideas

39

Write API (Telegraf)

Pull (Flux queries)

Push (Tasks)

Idea

Flink implements custom HTTP endpoint

Flink implements poll-based queries from InfluxDB

Flink implements endpoint to which InfluxDB tasks push data

Pro

Flink compatible with all InfluxDB integrations

No dependency on write API endpoint interface

Replayable Source

Easy to use (write Flux queries as usual)

Lower Latency

Con

Non-replayable

API format dependency

Non-replayable

custom integrations required for upstream clients

Latency

Achieving exactly-once may be hard

Non-Replayable

Achieving exactly-once may be hard

Write API (Telegraf)

Pull (Flux queries)

Push (Tasks)

Idea

Flink implements custom HTTP endpoint

Flink implements poll-based queries from InfluxDB

Flink implements endpoint to which InfluxDB tasks push data

Pro

Flink compatible with all InfluxDB integrations

No dependency on write API endpoint interface

Replayable Source

Easy to use (write Flux queries as usual)

Lower Latency

Con

Non-replayable

API format dependency

Non-replayable

custom integrations required for upstream clients

Latency

Achieving exactly-once may be hard

Non-Replayable

Achieving exactly-once may be hard

OSDP

40 of 42

Initial Sink Design Ideas

40

Checkpoint Series

“Uncommitted” Bucket

Idea

Insert an additional measurement whenever Flink creates a checkpoint.

Write new data into an „uncommitted table“ and copy the table contents into the „committed table“ when creating a checkpoint.

Pro

The user can query the data up to the most recent checkpoint.

No complex query logic to differentiate between checkpointed data and not checkpointed data.

Con

“Max & Filter Query” required to only get committed data

A lot of data copying.

Checkpoint Series

“Uncommitted” Bucket

Idea

Insert an additional measurement whenever Flink creates a checkpoint.

Write new data into an „uncommitted table“ and copy the table contents into the „committed table“ when creating a checkpoint.

Pro

The user can query the data up to the most recent checkpoint.

No complex query logic to differentiate between checkpointed data and not checkpointed data.

Con

“Max & Filter Query” required to only get committed data

A lot of data copying.

OSDP

41 of 42

User-Defined Deserializer

41

OSDP

42 of 42

User-Defined Serializer

42

OSDP