1 of 51

Aggregating metrics in-flight:

challenges and opportunities

Dmytro Kozlov

2 of 51

Dmytro Kozlov

Software Engineer of VictoriaMetrics

Software engineer with experience in scalable applications and user-centric solutions. A strong background in backend development and cloud infrastructure.

https://github.com/dmitryk-dk

3 of 51

How do you deal with cardinality issues?

4 of 51

What is cardinality?

Why it matters?

What is "high" cardinality?

The number of unique time series

It has effect on resource usage

and reliability of the monitoring system.

When monitoring starts to lag

5 of 51

How to deal with high cardinality?

What are the optimization techniques?

  1. Find the source
  2. Optimize the source
  • Relabeling: drop series or specific labels
  • Modify the source: change metrics exposition logic
  • Recording rules: pre-compute rollups

6 of 51

What are recording rules?

"Recording rules allow you to precompute frequently needed or computationally expensive expressions and save their result as a new set of time series. "

"Querying the precomputed result will then often be much faster than executing the original expression every time it is needed."

7 of 51

How recording rules work

# PromQL expression

- expr: sum(rate(requests_total[5m])) by(path)

# new metric name

record: "handler:requests:rate5m"

  • Expression `expr` will be evaluated periodically
  • Evaluation results will be recorded back under the new `record` name

8 of 51

9 of 51

10 of 51

Add new name handler:requests:rate5m

11 of 51

12 of 51

13 of 51

  1. The number of time series stored in TSDB is Data-in + Recording Rules
  2. No control over cardinality or churn

14 of 51

We need a way to transform data…

before it gets ingested in TSDB

15 of 51

Aggregation via chained Prometheus

16 of 51

Aggregation via chained Prometheus

  1. Long restart time under high load
  2. Overhead of running the full functional TSDB

17 of 51

Prometheus vs vmagent for stream aggregation

Scrape rate: 130k samples/s

Mem usage diff: x4.5

18 of 51

19 of 51

Stream aggregation

20 of 51

Persist already pre-processed, aggregated data

21 of 51

Stream aggregation

"Aggregate incoming samples in streaming mode by time and by labels before data is written to remote storage."

" The aggregation is applied to all the metrics received via any supported data ingestion protocol and/or scraped from Prometheus-compatible targets."

Stream aggregation is open-source and is supported by

vmagent and VictoriaMetrics single-node.

22 of 51

23 of 51

How stream aggregation works

- match:

- "prometheus_http_requests_total" # time series selector

interval: "1m" # on 1m interval

by: ["handler"] # group by label

outputs: ["total"] # aggregate as counter

Result:

prometheus_http_requests_total:1m_by_handler_total

<metric_name>:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>

24 of 51

How stream aggregation works

- match:

- "prometheus_http_requests_total" # time series selector

- "http_request_errors_total" # supports list

- '{__name__=~".+_total"}' # supports regex match

Original metrics are dropped automatically

Or kept if -streamAggr.keepInput is set.

25 of 51

How stream aggregation works

- match: "prometheus_http_requests_total"

outputs: [<output>, <output>]

26 of 51

How stream aggregation works

- match:

- "prometheus_http_requests_total" # time series selector

outputs: ["total"] # aggregate as counter

interval: "1m" # on 1m interval

by: ["handler"] # aggregate by specific labels

Or

without: ["handler"] # aggregate without specific labels

27 of 51

Opportunities

28 of 51

Reducing cardinality

Aggregating metric with the highest cardinality from https://play.victoriametrics.com/

apiserver_request_duration_seconds_bucket:

  • ~15K unique series per-day
  • ~7% from total number of series

29 of 51

Reducing cardinality

- match: "apiserver_request_duration_seconds_bucket"

interval: "5m"

outputs: [ "rate_sum" ]

by: [ "resource", "verb", "le" ]

apiserver_request_duration_seconds_bucket

:5m

_by_le_resource_verb

_rate_sum

30 of 51

Query raw vs aggregated metric

aggregated: 1.6sec

raw: 11s

31 of 51

Downsampling

- match: '{__name__!~".+(_total|_bucket)"}'

interval: "5m"

outputs: [ "min", "max", "count_samples",

"sum_samples" ]

metric_foo:5m_min

metric_foo:5m_max

metric_foo:5m_count_samples

metric_foo:5m_sum_samples

32 of 51

Deduplication

33 of 51

Aggregation for all ingested data

34 of 51

Push to any destination with Prometheus RW support

35 of 51

vmagent data processing pipeline

36 of 51

Low resource usage: scrape&send 130k samples/s

1 more CPU core

1GiB more mem

37 of 51

Challenges

38 of 51

Improving memory usage

39 of 51

Deduplication requires storing time series in mem

40 of 51

How to reduce memory when storing labels?

  • Store as concatenated string?
    • No, not big difference in mem usage
  • Hash to limit labels length?
    • No, higher CPU cost, inefficiency for small labels, collisions
  • Compress with zstd + dictionary?
    • No, too high CPU costs for about x2 size reduction
  • Custom compression specific for labels?

41 of 51

Label compressor concept

Series: metric{job="foo", instance="bar", pod="baz"}

Compress table:

Compressed series:

0123 ~16 Bytes (varint enc)

Label pair

Encode Idx (uint64)

__name__="metric"

0

job="foo"

1

instance="bar"

2

pod="baz"

3

42 of 51

Label compressor concept

Series: metric{job="foo", instance="bar", pod="qux"}

Compress table:

Compressed series:

0124 ~16 Bytes

Label pair

Encode Idx (uint64)

__name__="metric"

0

job="foo"

1

instance="bar"

2

pod="baz"

3

pod="qux"

4

Most of label-value pairs remains static in time.

And only some labels, like `pod`, change frequently.

43 of 51

Label compressor concept

Series: metric{job="foo", instance="bar", pod="qux"}

Compress table:

Compressed series:

0124 ~16 Bytes

Decompress:

  1. Decode varint
  2. Read 8 Bytes
  3. Lookup in compress table
  4. Read next 8 Bytes

Label pair

Encode Idx (uint64)

__name__="metric"

0

job="foo"

1

instance="bar"

2

pod="baz"

3

pod="qux"

4

44 of 51

Labels compressor concept

// LabelsCompressor compresses []Label into short binary strings

type LabelsCompressor struct {

labelToIdx sync.Map // encode label to Idx

idxToLabel sync.Map // decode Idx to label

nextIdx atomic.Uint64 // Idx generator

}

45 of 51

Labels compressor concept

func(lc *LabelsCompressor) compress(dst []uint64,labels []Label){

for i, label := range labels {

// fast path: label was seen before

v, ok := lc.labelToIdx.Load(label)

if !ok {

// slow path: register new label

idx := lc.nextIdx.Add(1)

lc.idxToLabel.Store(idx, label)

lc.labelToIdx.Store(label, idx)

}

// encode all label pairs into sequence

dst[i] = v.(uint64)

46 of 51

Labels compressor concept

func (lc *LabelsCompressor) decompress(dst []Label,src []uint64) []Label {

for _, idx := range src {

label := lc.idxToLabel.Load(idx)

dst = append(dst, label)

}

return dst

}

47 of 51

Labels compressor concept

func (lc *LabelsCompressor) decompress(dst []Label,src []uint64) []Label {

for _, idx := range src {

label := lc.idxToLabel.Load(idx)

dst = append(dst, label)

}

return dst

}

x5 memory reduction!

48 of 51

Horizontal scaling

-remoteWrite.shardByURL.labels=tenant

-remoteWrite.shardByURL.labels=__name__,instance

49 of 51

Summary

  • Stream aggregation allows aggregate data before it gets ingested in TSDB
  • Helps to control cardinality and churn rate via aggregation
  • Has low resource consumption
  • Horizontally scalable
  • Is open-source and compatible with Prometheus Remote Write protocol

50 of 51

Additional materials

  1. Stream aggregation documentation
  2. vmagent vs vmagent dashboard snapshot
  3. Prometheus: scrape-time rule evaluation
  4. lib/labelcompressor

51 of 51

Questions?