aggregated-consistent-hashing relay rule
We are currently using the original graphite python in our existing production environment. We are ingesting metrics from hundreds of servers and have the metrics balancing over multiple relays which then balance to multiple aggregators. The graphite relays have a flag for aggregated-consistent-hashing which came out of https://github.com/graphite-project/carbon/pull/32 . Is there a way to implement something like this using carbon-c-relay? I have started mirroring my production data over to a new carbon-c-relay cluster. I currently have a single relay passing data to multiple aggregators. It seems like the aggregation portion of carbon-c-relay is single threaded and I need to run more than one instance of it to process all of my data. I have tried both hashing methods carbon_ch & fnv1a_ch on the relay, but it sends similar types of data to different aggregators, then the aggregators aggregate their portion of the data and write it to the backend. The data being written to the backend isn't the full aggregation since that aggregator didn't collect all of the data.
An example:
aggregate ^([^.]+)\.timers\.(.*pdx.*)\.([^_.]+)\.count_ps$ every 10 seconds expire after 30 seconds compute sum write to \1.timers.\2._totals._pdx.count_ps._sum send to whisper_cache_b0 stop ;
Is there a way for me to implement something similar to aggregated-consistent-hashing using carbon-c-relay? Is the aggregation portion of carbon-c-relay really only single threaded? Is there a way for me to make use of the additional cpu resources on the server?
Thank you for any assistance you could provide. -Erin Willingham
Hey Erin, here's my experience, for what its worth:
- The aggregation code was multi-threaded about a year ago: issue #60
Poking through the aggregator_putmetric code, you can see a number of pthread mutex calls to protect access to the shared data structure holding aggregated metrics:
https://github.com/grobian/carbon-c-relay/blob/master/aggregator.c#L172
-
I ran into an issue with aggregator throughput while pushing around 1 million metrics every 10 seconds through carbon-c-relay. Aggregation was not able to keep up, so I would see large gaps in aggregated metrics. Further, via
netstat -an | grep 2003 | grep -v ' 0 0 'I could see metrics data backing up on the relay hosts's TCP receive queues. Andtopnever showed more than ~300% max CPU usage, even though the host had 32 cores. I suspect that the mutex locking code is the bottleneck, but I can't say for sure. (This is with carbon-c-relay 1.7-1). -
As a workaround, I run multiple carbon-c-relay instances on a single host. One functions as a relay, and the rest function as aggregators. I hand-configured certain subsets of metrics to go to specific aggregators for processing. This works fine and gets around the throughput problem (#2). But it's a bit of a hack.
-
A functioning implementation of aggregated-consistent-hashing would alleviate my need to hand-configure the relaying of certain metrics to certain aggregators (#3).
I think that a focus on aggregation throughput, and the addition of aggregated-consistent-hashing, would help make carbon-c-relay more scalable.
Indeed, the aggregator is severely hampered by shared data structures. I had plans to build separate sections for the worker threads so they can do their stuff in parallel, etc. but never got to it sofar.
The real problem is that all threads need to go through a shared piece which determines if a certain aggregate already exists or not – and if not, create it.
To implement aggregated-consistent-hashing, I would need to know what exactly it splits on, and how it tries to do that. Currently, using something like fnv1a_ch would relay all input metrics to the same aggregator, but this will fail if your inputs are complex matches that take multiple spaces. That said, it seems like what is achieved here, is a match-rule that is identical to the aggregation rule with a single target being the aggregation.
So I understand your scalability problem. I myself think it is a waste c-relay can't take advantage of multiple cores in this case, and would like to solve that first. The hashing technique seems like a workaround which will help, but cannot resolve the problem (for instance when you only have 1 aggregation).
Not sure if this helps, but what works for me on dealing with aggregations. We handle about 25M metrics per minute at this point, and almost all of them get sum'd to some degree. Our metrics are in the format of
path.to.metric.name.HOSTNAME = NUMBER. We aggregate them to path.to.metric.name.sum_all.hosts.
# Front end host
cluster cache
fnv1a_ch
backend-a-host:port=a
backend-b-host:port=b
.,,
;
aggregate
^(.+)\.sum\.([^.]+)$
every 60 seconds
expire after 90 seconds
timestamp at end of bucket
compute sum write to agg.\1.sum_all.hosts
;
match ^agg\. send to cache stop;
match * send to cache;
# Back end host
cluster cache
fnv1a_ch
127.0.0.1:2101=2101
127.0.0.1:2103=2103
127.0.0.1:2105=2105
127.0.0.1:2107=2107
;
aggregate
^agg\.(.+)$
every 60 seconds
expire after 120 seconds
timestamp at start of bucket
compute sum write to \1
;
match ^agg\. send to blackhole stop;
match * send to cache stop;
This lets me have a front end cluster of several nodes catch and pre-aggregate all the metrics, hash them, send intermediately aggregated metrics to the back end storage host, which re-sums the smaller number of things to aggregate, then passes them on to the carbon caches. Obviously, it only works for some of the aggregation functions, but we mostly use sum, so it works for me. But this lets me fan out the aggregations.