opentelemetry-collector-contrib
opentelemetry-collector-contrib copied to clipboard
new component: deltatocumulative processor
[!IMPORTANT] This component is in active development. See progress in https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705
The purpose and use-cases of the new component
In similar fashion to the existing cumulativetodeltaprocessor
, this component aggregates delta samples to their respective cumulative counterpart
An extensive design doc has been written to explore this idea.
Example configuration for the component
interval: 60s
stale: 5m
Telemetry data types supported
- metrics (sum, histogram, exp. histogram)
Code Owner(s)
@gouthamve @shorez
Sponsor (optional)
@jpkrohling
Additional context
Prior discussion:
- https://github.com/open-telemetry/opentelemetry-collector/issues/1422
- https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23790
- https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/9919
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/4968
will fix:
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30435
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/15281
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22806
I'm sponsoring this, as it's important for our Prometheus story.
This is a duplicate of #29300, which I've also offered to sponsor. I'm glad to see there are others interested in this as well. The main thing I think we need to do in order to combine these proposals is decide whether or not the component will be split into two parts, as described here. I would appreciate your thoughts on this @sh0rez and @jpkrohling.
@RichieSams, looks like there is already an implementation started so I would recommend pausing until we work out which approach we are going to use.
Also cc: @0x006EA1E5 who proposed the split design.
@djaglowski looking at the comment you referenced, it appears the proposal is the following:
- deltatocumulative: converts a stream of delta samples [1,2,3] to [1,3,6]
- time-aggregate: converts a stream of any samples [1,3,6] to [6] or similar, depending on aggregation
is that correct?
If so, that aligns perfectly with our work here, because https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705 implements exactly the behavior described in (1).
/cc @richiesams @0x006EA1E5 @gouthamve
That's great @sh0rez, thanks for clarifying. I'm excited to see this moving forward.
Based on this already having a more detailed proposal and implementation which is quite far along, I'll close #29300 and try to help #29461 in parallel.
@sh0rez Would you like any assistance in the implementation of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705 ?
If not, I can put my effort towards https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29461
@richiesams I think I'm covered in terms of code writing, but a review of https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30706 and https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707 would be incredibly helpful!
Let me know if I can assist in https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29461 in any way
Hi, sorry just catching up with this. I'm very happy this is moving forward, and happy to help wherever I can...
I've been trying to think of use-cases, edge cases etc. Scenarios I can think of so far:
- The count connector example, where the count connector is producing a series of deltas, with no
start_time_unix_nano
- Producer is in same collector instance as this processor
- We know that there is only a single producer, and that datapoints will be in-order.
- "Stateless delta producers". I'm thinking here of IoT, Web Clients, FaaS, that are all just pinging off deltas to an OTLP endpoint. We don't care about the producer instance ID or even the timestamps, we are just interested in aggregating a global count of increments.
- "Single well-behaved OTLP source". This is the case where we get a stream of delta data points from a "single-writer", and the data points contain each
start_time_unix_nano
- Producer is external to the collector instance in this example case, so could have gaps in the data due to network issues etc, right?
- Can we assume datapoints are in-order?
- "Poorly-behaved OTLP source". This is the case where we get a stream of delta data points, but could be missing required attributes, may be out of order etc
I'm wondering, as long this processor can handle case (1) above (well formatted delta datapoints missing a start_time_unix_nano
), are we able to fix up any other problems with the data with preceding "sanitisation" processors? Especially, are we able to set/clear start_time_unix_nano
and time_unix_nano
for the data_points
in the DataPoint Context?
Put another way, what is the simplest possible implementation of this processor, assuming upstream "sanitisation"?
I am not sh0rz,I am shorez ,hahahah 北京欢迎你~~~~~~ 如果来中国旅游的话
发件人: Greg Eales @.> 发送时间: 2024年1月23日 15:31 收件人: open-telemetry/opentelemetry-collector-contrib @.> 抄送: shorez @.>; Mention @.> 主题: Re: [open-telemetry/opentelemetry-collector-contrib] new component: deltatocumulative processor (Issue #30479)
Hi, sorry just catching up with this. I'm very happy this is moving forward, and happy to help wherever I can...
― Reply to this email directly, view it on GitHubhttps://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30479#issuecomment-1906299165, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AE3MURMQTGG5D5SWXVMGPPLYP7JUBAVCNFSM6AAAAABBYK2XXWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSMBWGI4TSMJWGU. You are receiving this because you were mentioned.Message ID: @.***>
I think it makes sense for the configuration for this processor and #29461 to align with the cumulativetodeltaprocessor
, specifically include
and exclude
.
All three processors would also need to have a similar "MetricIdentity" feature.
Would it make sense to factor out this common code?
There is also something somewhat similar in the prometheusexporter
As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric
I think it makes sense for the configuration for this processor and #29461 to align with the
cumulativetodeltaprocessor
, specificallyinclude
andexclude
.All three processors would also need to have a similar "MetricIdentity" feature.
Would it make sense to factor out this common code?
There is also something somewhat similar in the
prometheusexporter
As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric
I agree we should probably look at creating a shared set of Identity
structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10
Using hashes of the metric attributes, etc.
@0x006EA1E5 I think it makes sense for the configuration for this processor and https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29461 to align with the cumulativetodeltaprocessor, specifically include and exclude.
I was thinking about this during the implementation of this component and preliminarily decided against it.
I don't think it's common to have an exporter receive some metrics as delta and others as cumulative (happy to be proven wrong).
Even if that was required, the filterprocessor
can be used to build such a pipeline. The filterprocessor
is very sophisticated and we would essentially be implementing a "lite-version" here, that will likely always be worse.
@0x006EA1E5 As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric
Exactly! I wrote a bit about this in the design doc: https://docs.google.com/document/d/1Oqwl5rDLqB6-Qgd6Hy1PXYZBAH4pkcdudxNA7bRkrIc/edit#heading=h.a00fffk0v68v
@RichieSams I agree we should probably look at creating a shared set of Identity structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10
yeah I felt like this was generally useful while writing this, happy to factor this out to a common place! use-cases I can think of right now are deltatocumulativeprocessor
, cumulativetodeltaprocessor
, intervalprocessor
. The prometheusexporter
currently does something like this as well for aggregating deltas, but it may as well stop doing this entirely once deltatocumulative
is stable
I agree we should probably look at creating a shared set of
Identity
structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10Using hashes of the metric attributes, etc.
When I first looked at this, my first thought was to use some kind of non-crypto hash too, but then I looked at cumulativetodeltaprocessor
and prometheusexporter
and saw they both seemed to just be doing some kind of simple string concat.
It made me wonder, is the space saving worth the hashing cost (which has to be performed on every single input), vs the cost of using a string builder? How big do you make the hash to avoid collisions, etc?
I don't know the answer to that, my feeling would be that any gain may be marginal at best, so for my POC implementation I just went with concat + separator (basically a copy / paste from the prometheus exporter).
As I implied above, once I started to analyse this in more detail, I noticed that this is something the cumulativetodeltaprocessor
already does (as well as the prometheusexporter
). So I guess it is something that is likely to be needed in general? And if so it makes sense to implement it once, at the least so the behaviour is consistent.
In general, my heuristic approach would be, if in doubt, keep it simple, and optimise later when you know there is a concrete need.
That's not to say I'm against using a hash, just that I wasn't personally confident it was worth the cost of change etc, compared to simply factoring out and reusing the current cumulativetodeltaprocessor
's implementation.
Indeed, if the consensus is that a hash is a better fit here, I'm keen to hear it! 😄
It made me wonder, is the space saving worth the hashing cost (which has to be performed on every single input), vs the cost of using a string builder? How big do you make the hash to avoid collisions, etc?
But they're feeding the string directly into golang's hash function (because they're using the string as the key lookup in a map). So it will be basically identical. We're just skipping the step of having to having the big byte array in memory / use a pool of buffers
But they're feeding the string directly into golang's hash function (because they're using the string as the key lookup in a map). So it will be basically identical. We're just skipping the step of having to having the big byte array in memory / use a pool of buffers
Doesn't the sync.Map
have to hash the key again though, even if the key is itself the result of a hash? (Does the gloang map need keys to be well distributed?)
Is the hash used here not going to get occasional collisions? Collisions are a fair trade off in a map, as any collision will be resolved by the map also having the actual key to compare. But wouldn't you want a much bigger hash if it's intended as a UUID and you want to be sure there won't be any collisions?
What is the expected benefit of using a hash?
What are the risks/consequences of something going wrong?
Just kind of worries me, when it is not even clear that there is a significant problem here that needs to be solved in the first place 🤷
sync.Map() (and golang native map) call Hash()
on the keys. So cumulativetodelta
uses a string key. The runtime has a Hash()
function for native strings, so it uses that. For @sh0rez 's solution, we just directly implement Hash()
ourselves for Ident
. Meaning we don't need to do the string building.
Both solutions will do a hash. That's just how maps work.
Is the hash used here not going to get occasional collisions?
Yes, which is fine. The std-lib map implementation needs to deal with hash collisions anyway. So this isn't a problem. FNV has "good-enough" hash properties for our use-case.
What is the expected benefit of using a hash?
Just to clarify, we're comparing:
-
cumulativetodelta
approach: Create a big unique string, which is then hashed by the stdlib - @sh0rez approach: Implement
Hash()
ourselves by using fnv to combine the unique data
@sh0rez 's approach cleaner / clearer what is happening IMO. And we don't need to do large string builder manipulations.
What are the risks/consequences of something going wrong?
The only risk IMO is forgetting to include a "unique" bit of data in the hash combination. But that should be trivial to audit.
I was thinking about this during the implementation of this component and preliminarily decided against it. I don't think it's common to have an exporter receive some metrics as delta and others as cumulative (happy to be proven wrong). Even if that was required, the
filterprocessor
can be used to build such a pipeline. Thefilterprocessor
is very sophisticated and we would essentially be implementing a "lite-version" here, that will likely always be worse.
@sh0rez I guess it would be nice to know why the cumulativetodeltaprocessor
has this include
/exclude
config in the first place (PR: #8952). Perhaps it predates some of the other feature that can do it better?
@RichieSams Sorry, I'm being slow today (I actually have mild COVID, so my brain is not really cooperating 😅)
Yes, which is fine. The std-lib map implementation needs to deal with hash collisions anyway. So this isn't a problem.
But how can it deal with a collision if the hash is done to create the actual key? 🤔
In a map, hash collisions are just a performance hit, as the hash is just telling the map which bucket to look in, so a collision means that more than one item is in the bucket. Worst case scenario, the map iterates over the bucket items until it finds an exact match for the key.
In the case of using a long string as the key, the map can always compare one long string to another if needed, and get the right item.
In the (admittedly very rare) case of a new metric producing the same hash as another, already tracked metric, you'll get the original metric's tracker returned by the map.
Are you then going to compare the tracked attributes etc against the incoming item, to check there hasn't been a collision? What do you do if there is a collision, track both items under the same key? You're then kind of implementing a map on top of a map.
That's obviously not viable, so you're going to end up just saying that collisions happen so infrequently that you'll pretend they don't happen.
So, for me, the consequences of something going wrong are, (very infrequently) sometimes you'll mangle the users data, and add deltas from one metric to another's cumulative.
You're sacrificing a tiny bit of correctness, so that you don't need to build and store the string.
You're sacrificing a tiny bit of correctness, so that you don't need to build and store the string.
Actually, looking at this, the code here is actually storing all the metric's attributes in the map (am I reading that right?)
Something like this is required, as the emitted cumulative data points need to have all the same attributes as the source datapoint (I guess the alternative would be to copy from each incoming delta datapoint and not store it at all)
So, wouldn't it be possible to have the name, all the attributes etc, as the key, then the map's item could just be the cumulative sum and the timestamps etc? 🤔 This way there isn't even a memory overhead, as you've just moved the data from the map's value to the key.
Maybe that doesn't work for some reason?
@0x006EA1E5 @RichieSams let me try to provide some clarity on metric identity and hashing.
The OTel metrics spec defines the fields required to identify a metric, and also how to identify a single stream within a metric. I have written about that in my design doc: https://docs.google.com/document/d/1Oqwl5rDLqB6-Qgd6Hy1PXYZBAH4pkcdudxNA7bRkrIc/edit#heading=h.a00fffk0v68v. This is also implemented in streams.Identity
in my code.
For each distinct stream S there will be one and only one identity I. Each identity points to only one stream. There cannot be collisions by design. The streams are either different, or the identifying attributes in the spec are incomplete, which is a bug.
Go has the concept of comparable data types. Those need to follow some data rules, but then can be directly used with ==
and as map keys. Go does this by auto-generating hidden hash functions for those.
My Ident
data types also have a custom Hash()
function which uses the fnv64a
algorithm, but this is not used by the current implementation aside for String()
printing, so don't confuse these.
Because map
(any be extension pcommon.Map
) is not comparable, they are the only field that is converted to uint64 by pdatautil.MapHash
. The other identifying properties are int or string, which can be copied as-is.
Having those Ident
types be structs which include all identifying information (as per spec) as members makes it a little easier to inspect / debug those. It comes at a slight memory cost, as they are bigger than plain hashes. If we ever find that is an issue, we can switch to calling Hash()
on these instead, bringing the memory footprint down to uint64
. I don't think this will be necessary though.
@0x006EA1E5 The data.Point[T]
interface is not directly related to identities, as it's an abstraction over pmetric.NumberDataPoint
et al. So they are the "other side" of the map[streams.Ident]data.Point
The implementations of it are also not allocating, as they merely wrap existing pmetric.*
pointer types.
The Ident
types are very generally useful, the data.Point
ones are a bit more delta-to-cumulative focused, e.g. as they have the Add()
function.
@0x006EA1E5 I guess it would be nice to know why the cumulativetodeltaprocessor has this include/exclude config in the first place (PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/8952). Perhaps it predates some of the other feature that can do it better?
That's a good question. Maybe @jpkrohling @codeboten @Aneurysm9 can clarify?
It looks like @TylerHelmuth might be able to help here, here's the closest I could find about this: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/5877#issuecomment-1076915746
Today, I think I'd prefer to see a connector solution to deal with include/exclude though. I would prefer to not have this include/exclude in the new component unless we have concrete use-cases for it that cannot be done otherwise, or can only done in a way that severely impacts performance.
There is https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/25161 to address filter interface for receivers.
Ultimately the cumulativetodelta allows specifying which metrics to compact because that gives users more control over their data. I tend to favor giving users absolute control, even if that means they can hurt themselves. The cumulativetodelta processor will convert everything if not include
/exclude
is supplied.
You can also safely choose not to allow include
/exclude
config at the start; it can always be added later without a breaking change.
It looks like @TylerHelmuth might be able to help here, here's the closest I could find about this: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/5877#issuecomment-1076915746
Today, I think I'd prefer to see a connector solution to deal with include/exclude though. I would prefer to not have this include/exclude in the new component unless we have concrete use-cases for it that cannot be done otherwise, or can only done in a way that severely impacts performance.
I agree with this because the alternative IMO leads to a mess of overlapping functionality. Include/exclude is not any more related to this processor than any other type of transformation which a user may need to do. We're better off decomposing the functionality so that users can manage their data however they want.
Ultimately the cumulativetodelta allows specifying which metrics to compact because that gives users more control over their data. I tend to favor giving users absolute control, even if that means they can hurt themselves.
I don't think this actually gives users more control. It just pulls a specific type of control into the component. The problem is that you could make the same case for adding almost any type of processing to almost any component. The reason we have components is so that they can be composed as necessary.
I don't think this actually gives users more control. It just pulls a specific type of control into the component. The problem is that you could make the same case for adding almost any type of processing to almost any component. The reason we have components is so that they can be composed as necessary.
Sorry if this is a silly question, but what would be the best way to address the use case where we want to selectively process some telemetry? 🤔
Would we use the routingconnector and the forwardconnector?
Something like:
receivers:
otlp/in:
exporters:
otlp/out:
processors:
deltatocumulative:
connectors:
routing:
default_pipelines: [metrics/out]
table:
- statement: route() where <something something something>
pipelines: [metrics/delta]
forward:
service:
pipelines:
metrics/in:
receivers: [otlp/in]
exporters: [routing]
metrics/delta:
receivers: [routing]
processors: [deltatocumulative]
exporters: [forward]
metrics/out:
receivers: [routing, forward]
exporters: [otlp/out]
?
Would we use the routingconnector and the forwardconnector?
Yes, I believe we should be relying on connectors for routing telemetry to the appropriate processors.
@sh0rez Can I ask what the plan is for the common functionality that we can expect to be shared with the implementation of #29461 (metric identity etc)?
It looks like you are doing everything within the deltatocumulative
processor in the initial PR, and expecting that any subsequent processors would factor out needed common functionality, right?
Do we intend to also refactor the cumulativetodelta
processor to also share any common functionality we have here here? Obviously, if so, we would need to be careful about any subtile changes in behaviour...
I'm wondering, have you considered simply refactoring cumulativetodelta
to pull out anything shared with this processor first, as part of the initial implementation of the deltatocumulative
processor?
This could mean either simply adopting the cumulativetodelta
tracking implementation as part if the PR (and then re-implementing later as another issue, with the improvements you have shown), or going ahead and changing the to-be-shared common tracker now for both processors, with your improvements.
The value of this would be we don't diverge between cumulativetodelta
and deltatocumulative
, and it would also make it a bit easier to make a start on #29461 now, as @RichieSams could branch from your branch (Otherwise as noted elsewhere it becomes a bit difficult to review #30827).
It seems to me that we will surely have to factor out common code at some point soon in any case (as #29461 is required to make the deltatocumulative
useful the original use-case of the count connector -> prometheus remote write exporter). And it is possible that making the common code work for all three processors (cumulativetodelta
, deltatocumulative
, #29461) would inform the initial implementation here.
Is this a useful suggestion, or does it make things unnecessarily more difficult for you?
@0x006EA1E5
I'm wondering, have you considered simply refactoring cumulativetodelta to pull out anything shared with this processor first, as part of the initial implementation of the deltatocumulative processor?
yes I considered that, however I chose against doing so for two major reasons:
- it's internal to that processor and written in a rather specific, non-composable way, so some changes would be necessary. Adding that to the process would mean additional slow-down I was trying to avoid for this initial phase.
- the implementation is string-based, as in all identifying data is written to a string, which is briefly hashed and immediately garbage-collected. I think we can do better from a memory perspective and I think we succeeded here. I can run benchmarks to back this up if desired.
The value of this would be we don't diverge between cumulativetodelta and deltatocumulative, and it would also make it a bit easier to make a start on https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29461 now, as @RichieSams could branch from your branch (Otherwise https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30827#pullrequestreview-1854720156 it becomes a bit difficult to review https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30827).
I factored out generic tracking in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31017#issuecomment-1938851867 recently, which was then picked up by @RichieSams in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31089#event-11724571632. Looking at that PR and his other work on the interval processor, the proposed api seems to be as versatile and composable as I had hoped for.
Converting the cumulativetodelta at a later point should be rather straightforward as well, bringing the performance benefits there as well.