Aggregating metrics in-flight:
challenges and opportunities
Dmytro Kozlov
Dmytro Kozlov
Software Engineer of VictoriaMetrics
Software engineer with experience in scalable applications and user-centric solutions. A strong background in backend development and cloud infrastructure.
How do you deal with cardinality issues?
What is cardinality?
Why it matters?
What is "high" cardinality?
The number of unique time series
It has effect on resource usage
and reliability of the monitoring system.
When monitoring starts to lag
How to deal with high cardinality?
What are the optimization techniques?
What are recording rules?
"Recording rules allow you to precompute frequently needed or computationally expensive expressions and save their result as a new set of time series. "
"Querying the precomputed result will then often be much faster than executing the original expression every time it is needed."
How recording rules work
# PromQL expression
- expr: sum(rate(requests_total[5m])) by(path)
# new metric name
record: "handler:requests:rate5m"
Add new name handler:requests:rate5m
We need a way to transform data…
before it gets ingested in TSDB
Aggregation via chained Prometheus
Aggregation via chained Prometheus
Prometheus vs vmagent for stream aggregation
Scrape rate: 130k samples/s
Mem usage diff: x4.5
Stream aggregation
Persist already pre-processed, aggregated data
Stream aggregation
"Aggregate incoming samples in streaming mode by time and by labels before data is written to remote storage."
" The aggregation is applied to all the metrics received via any supported data ingestion protocol and/or scraped from Prometheus-compatible targets."
Stream aggregation is open-source and is supported by
vmagent and VictoriaMetrics single-node.
How stream aggregation works
- match:
- "prometheus_http_requests_total" # time series selector
interval: "1m" # on 1m interval
by: ["handler"] # group by label
outputs: ["total"] # aggregate as counter
Result:
prometheus_http_requests_total:1m_by_handler_total
<metric_name>:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
How stream aggregation works
- match:
- "prometheus_http_requests_total" # time series selector
- "http_request_errors_total" # supports list
- '{__name__=~".+_total"}' # supports regex match
Original metrics are dropped automatically
Or kept if -streamAggr.keepInput is set.
How stream aggregation works
- match: "prometheus_http_requests_total"
outputs: [<output>, <output>]
How stream aggregation works
- match:
- "prometheus_http_requests_total" # time series selector
outputs: ["total"] # aggregate as counter
interval: "1m" # on 1m interval
by: ["handler"] # aggregate by specific labels
Or
without: ["handler"] # aggregate without specific labels
Opportunities
Reducing cardinality
Aggregating metric with the highest cardinality from https://play.victoriametrics.com/
apiserver_request_duration_seconds_bucket:
Reducing cardinality
- match: "apiserver_request_duration_seconds_bucket"
interval: "5m"
outputs: [ "rate_sum" ]
by: [ "resource", "verb", "le" ]
apiserver_request_duration_seconds_bucket
:5m
_by_le_resource_verb
_rate_sum
Query raw vs aggregated metric
aggregated: 1.6sec
raw: 11s
Downsampling
- match: '{__name__!~".+(_total|_bucket)"}'
interval: "5m"
outputs: [ "min", "max", "count_samples",
"sum_samples" ]
metric_foo:5m_min
metric_foo:5m_max
metric_foo:5m_count_samples
metric_foo:5m_sum_samples
Deduplication
Aggregation for all ingested data
Push to any destination with Prometheus RW support
vmagent data processing pipeline
Low resource usage: scrape&send 130k samples/s
1 more CPU core
1GiB more mem
Challenges
Improving memory usage
Deduplication requires storing time series in mem
How to reduce memory when storing labels?
Label compressor concept
Series: metric{job="foo", instance="bar", pod="baz"}
Compress table:
Compressed series:
0123 ~16 Bytes (varint enc)
Label pair | Encode Idx (uint64) |
__name__="metric" | 0 |
job="foo" | 1 |
instance="bar" | 2 |
pod="baz" | 3 |
Label compressor concept
Series: metric{job="foo", instance="bar", pod="qux"}
Compress table:
Compressed series:
0124 ~16 Bytes
Label pair | Encode Idx (uint64) |
__name__="metric" | 0 |
job="foo" | 1 |
instance="bar" | 2 |
pod="baz" | 3 |
pod="qux" | 4 |
Most of label-value pairs remains static in time.
And only some labels, like `pod`, change frequently.
Label compressor concept
Series: metric{job="foo", instance="bar", pod="qux"}
Compress table:
Compressed series:
0124 ~16 Bytes
Decompress:
Label pair | Encode Idx (uint64) |
__name__="metric" | 0 |
job="foo" | 1 |
instance="bar" | 2 |
pod="baz" | 3 |
pod="qux" | 4 |
Labels compressor concept
// LabelsCompressor compresses []Label into short binary strings
type LabelsCompressor struct {
labelToIdx sync.Map // encode label to Idx
idxToLabel sync.Map // decode Idx to label
nextIdx atomic.Uint64 // Idx generator
}
Labels compressor concept
func(lc *LabelsCompressor) compress(dst []uint64,labels []Label){
for i, label := range labels {
// fast path: label was seen before
v, ok := lc.labelToIdx.Load(label)
if !ok {
// slow path: register new label
idx := lc.nextIdx.Add(1)
lc.idxToLabel.Store(idx, label)
lc.labelToIdx.Store(label, idx)
}
// encode all label pairs into sequence
dst[i] = v.(uint64)
Labels compressor concept
func (lc *LabelsCompressor) decompress(dst []Label,src []uint64) []Label {
for _, idx := range src {
label := lc.idxToLabel.Load(idx)
dst = append(dst, label)
}
return dst
}
Labels compressor concept
func (lc *LabelsCompressor) decompress(dst []Label,src []uint64) []Label {
for _, idx := range src {
label := lc.idxToLabel.Load(idx)
dst = append(dst, label)
}
return dst
}
x5 memory reduction!
Horizontal scaling
-remoteWrite.shardByURL.labels=tenant
-remoteWrite.shardByURL.labels=__name__,instance
Summary
Additional materials
Questions?