1 of 37

Scaling Graphite at Criteo

FOSDEM 2018 - “Not yet another talk about Prometheus”

2 of 37

Me

Corentin Chary

Twitter: @iksaif�Mail: c.chary@criteo.com

  • Working on Graphite with the Observability team at Criteo
  • Worked on Bigtable/Colossus at Google

3 of 37

Graphite

Big

Storing time series in Cassandra and querying them from Graphite

4 of 37

Graphite

  • Graphite does two things:
    • Store numeric time-series data
    • Render graphs of this data on demand

5 of 37

Carbon - The metric ingestion daemon

  • Persist metrics to disk
    • carbon-cache: writes points to the storage layer
    • Default database: whisper, one file = one metric

  • And also ..
    • carbon-relay: receives the metrics from the clients and relay/duplicate them
    • carbon-aggregator: ‘aggregates’ metrics based on rules

host123.cpu0.user 1517651768 100.5� <metric> <timestamp> <value>

c-cache

disk

c-relay

6 of 37

graphite-web - UI and API

  • Django Web application

  • UI to browse metrics, display graph, build dashboard
    • Mostly deprecated by Grafana

  • API to list metrics and fetch points (and generate graphs)
    • /metrics/find?query=my.metrics.*
    • /render/?target=sum(my.metrics.*)&from=-10m

7 of 37

Our usage

  • 6 datacenters, 20k servers, 10k+ “applications”
  • We ingest and query >80M metrics
    • Read: ~20K metrics/s
    • Write: ~800K points/s
  • 3000+ dashboards, 1000+ alerts (evaluated every 5min)
  • 30 teams over 2 continents

8 of 37

architecture overview

Applications

(TCP)

carbon-relay�(dc local)

carbon-relay

carbon-cache

graphite

API + UI

grafana

(UDP)

in-memory

persisted

(UDP)

DC 1

9 of 37

architecture overview (r=2)

Applications

carbon-relay

(UDP)

in-memory

(UDP)

DC 1

DC 2

10 of 37

current tools are improvable

  • Graphite lacks true elasticity
    • … for storage and QPS
    • One file per metric is wasteful even with sparse files
  • Graphite’s clustering is naïve (slight better with 1.1.0)
    • Graphite-web clustering very fragile
    • Single query can bring down all the cluster
    • Huge fan-out of queries
  • Tooling
    • Whisper manipulation tools are brittle
    • Storage ‘repair’/’reconciliation’ is slow and inefficient (multiple days)
    • Scaling up is hard and error prone

11 of 37

solved problems?

  • Distributed database systems have already solved these problems
    • e.g. Cassandra, Riak, …
  • Fault tolerance through replication
    • Quorum queries and read-repair ensure consistency
  • Elasticity through consistent hashing
    • Replacing and adding nodes is fast and non-disruptive
    • Repairs are transparent, and usually fast
    • Almost-linear scalability

12 of 37

BigGraphite

13 of 37

decisions, decisions

  • OpenTSDB (HBase)
    • Too many moving parts
    • Only one HBase cluster at Criteo, hard/costly to spawn new ones
  • Cyanite (Cassandra/ES)
    • Originally depended on ES, manually ensures cross-database consistency…
    • Doesn’t behave exactly like Carbon/Graphite
  • KairosDB (Cassandra/ES)
    • Dependency on ES for Graphite compatibility
    • Relies on outdated libraries
  • [insert hyped/favorite time-series DBs]
  • Safest and easiest: build a Graphite plugin using only Cassandra

14 of 37

Target architecture overview

metrics

carbon-relay

carbon-cache

grafana

Cassandra

graphite

API + UI

carbon-cache

Cassandra

graphite

API + UI

Cassandra

15 of 37

plug’n’play

  • Graphite has support for plug-ins since v1.0

carbon (carbon.py)

  • create(metric)
  • exists(metric)
  • update(metric, points)

update(uptime.nodeA, [now(), 42])

Graphite-Web (graphite.py)

  • find(glob)
  • fetch(metric, start, stop)

find(uptime.*) -> [uptime.nodeA]

fetch(uptime.nodeA, now()-60, now())

Slightly more complicated than that…

16 of 37

storing time series in Cassandra

  • Store points…
    • (metric, timestamp, value)
    • Multiple resolutions and TTLs (60s:8d, 1h:30d, 1d:1y)
    • Write => points
    • Read => series of points (usually to display a graph)
  • …and metadata !
    • Metrics hierarchy (like a filesystem, directories and metrics — like whisper)
    • Metric, resolution, [owner, …]
    • Write => new metrics
    • Read => list of metrics from globs (my.metric.*.foo.*)

17 of 37

<Cassandra>

18 of 37

Storing data in Cassandra

  • Sparse matrix storage
    • Map< Row, Map< Column, Value > >

  • Row ⇔ Partition
    • Atomic storage unit (nodes hold full rows)
    • Data distributed according to Hash(rowKey)
  • Column Key
    • Atomic data unit inside a given partition
    • Unique per partition
    • Has a value
    • Stored in lexicographical order (supports range queries)

store( row, col, val )

H = hash( row )

Node = get_owner( H )

send( Node, (row, col, val) )

19 of 37

naïve schema

CREATE TABLE points (� metric text, -- Metric name� time bigint, -- Value timestamp� value double, -- Point value� PRIMARY KEY ((path), time)�) WITH CLUSTERING ORDER BY (time DESC);

  • Boom! Timeseries.
    • (Boom! Your cluster explodes when your have many points on each metric.)�(Boom! You spend your time compacting data and evicting expired points)

20 of 37

time sharding schema

CREATE TABLE IF NOT EXISTS %(table)s (� metric uuid, -- Metric UUID (actual name stored as metadata)� time_start_ms bigint, -- Lower time bound for this row� offset smallint, -- time_start_ms + offset * precision = timestamp� value double, -- Value for the point.� count int, -- If value is sum, divide by count to get the avg� PRIMARY KEY ((metric, time_start_ms), offset)� ) WITH CLUSTERING ORDER BY (offset DESC)� AND default_time_to_live = %(default_time_to_live)d

  • table = datapoints_<resolution>
  • default_time_to_live = <resolution.duration>
  • (Number of points per partition limited to ~25K)

21 of 37

demo (sort of)

cqlsh> select * from biggraphite.datapoints_2880p_60s limit 5;

metric | time_start_ms | offset | count | value�--------------------------------------+---------------+--------+-------+-------�7dfa0696-2d52-5d35-9cc9-114f5dccc1e4 | 1475040000000 | 1999 | 1 | 2019�7dfa0696-2d52-5d35-9cc9-114f5dccc1e4 | 1475040000000 | 1998 | 1 | 2035�7dfa0696-2d52-5d35-9cc9-114f5dccc1e4 | 1475040000000 | 1997 | 1 | 2031�7dfa0696-2d52-5d35-9cc9-114f5dccc1e4 | 1475040000000 | 1996 | 1 | 2028�7dfa0696-2d52-5d35-9cc9-114f5dccc1e4 | 1475040000000 | 1995 | 1 | 2028

(5 rows)

Partition key Clustering Key Value

22 of 37

Finding nemo

  • How to find metrics matching fish.*.nemo* ?
  • Most people use ElasticSearch, and that’s fine, but we wanted a self-contained solution

23 of 37

we’re feeling SASI

  • Cassandra >3.5 builds have secondary index facilities
    • SASI (SSTable-Attached Secondary Indexes)
  • Split metric path into components
    • criteo.cassandra.uptime ⇒ part0=criteo, part1=cassandra, part2=uptime, part3=$end$
    • criteo.* => part0=criteo,part2=$end$

  • Associate metric UUID to the metric name’s components
  • Add secondary indexes to path components
  • Retrieve metrics matching a pattern by querying the secondary indexes

  • See design.md and SASI Indexes for details

24 of 37

do you even query?

  • a.single.metric (equals)
    • Query: part0=a, part1=single, part2=metric, part3=$end$
    • Result: a.single.metric
  • a.few.metrics.* (wildcards)
    • Query: part0=a, part1=few, part2=metrics, part4=$end$
    • Result: a.few.metrics.a, a.few.metrics.b, a.few.metrics.c
  • match{ed_by,ing}.s[ao]me.regexp.?[0-9] (almost regexp, post-filtering)
    • ^match(ed_by|ing)\.s[ao]me\.regexp\..[0-9]$
    • Query: part3=regexp,part5=$end$
    • Result: matched_by.same.regexp.b2, matched_by.some.regexp.a1, matching.same.regexp.w5, matching.some.regexp.z8

25 of 37

</Cassandra>

26 of 37

And then ?

27 of 37

BIGGEST GRAPHITE CLUSTER IN THE MULTIVERSE ! (or not)

  • 800 TiB of capacity, 20 TiB in use currently (R=2)
  • Writes: 1M QPS
  • Reads: 10K QPS
  • 24 bytes per point, 16 bytes with compression
    • But probably even better with double-delta encoding !
  • 20 Cassandra nodes
  • 6 Graphite Web, 10 Carbon Relays, 10 Carbon Cache
  • x3 ! 2 Replicated Datacenters, one isolated to canary changes.

28 of 37

How to use it ?

$ pip install biggraphite�$ bgutil syncdb # create tables�$ bg-import-whisper /opt/graphite/storage/whisper # import data��STORAGE_FINDERS = ['biggraphite.plugins.graphite.Finder'] # graphite-web��BG_DRIVER = cassandra # carbon��Voilà ! And you can use *both* whisper and biggraphite during the migration.

29 of 37

links of (potential) interest

30 of 37

Roadmap ?

  • Add Prometheus Read/Write support (BigGraphite as long term storage for Prometheus).
  • Optimize writes: bottleneck on Cassandra is CPU, we could divide CPU usage by ~10 with proper batching
  • Optimize reads: better parallelization, better long term caching (points usually don’t change in the past)

31 of 37

More Slides

32 of 37

33 of 37

Monitoring your monitoring !

  • Graphite-Web availability (% of time on a moving window)
  • Graphite-Web performances (number of seconds to sum 500 metrics, at the 95pctl)
  • Point loss: sending 100 points per dc per seconds, checking how many arrive in less than 2 minutes
  • Point delay: setting the timestamp as the value, and diffing with now (~2-3 minutes)

34 of 37

Aggregation�Downsampling/Rollups/Resolutions/Retentions/...

35 of 37

roll what? hmmm?

  • Retention policy: 60s:8d,1h:30d,1d:1y (combination of stages)
  • Stage: 60s:8d (resolution 60 seconds for 8d)
  • Aggregator functions: sum, min, max, avg
  • stage[N] = aggregator(stage[N-1])

60s:8d��(stage0)

1h:30d

1d:1y

sum(points)

sum(points)

36 of 37

  • Obviously this doesn’t work for average
    • avg(avg(1, 2), avg(3, 4)) ≠ avg(1, 2, 3, 4)
    • We have to store both ‘value’ and ‘count’!
    • (See downsampling.py for details)

  • Cheaper / simpler to do directly on the write path
    • Checkpointed every 5 minutes
    • But since processes can restart !
    • We use unique write ids (and replica ids when using replicated clusters)
    • (See design.md)

37 of 37

What about aggregation ?

  • We aggregate in the carbon plugins, before writing the points
  • Points for different level of resolution go to different tables for efficient compactions and TTL expiration
  • Reads go to a specific table depending on�the time window