opentelemetry-collector
opentelemetry-collector copied to clipboard
Applying memory_limiter extension
Problem
As part of https://github.com/open-telemetry/opentelemetry-collector/issues/8632, we want receivers to be able to reject incoming requests if the collector is running out of memory limits instead of accepting the data transforming to OTLP and get it rejected by the memory_limiter
processor. Scraping receivers would skip the scraping in that situation.
We introduced a memory_limiter extension in https://github.com/open-telemetry/opentelemetry-collector/pull/8964, but it has not been applied yet.
https://github.com/open-telemetry/opentelemetry-collector/pull/9397 proposes a way to configure a memory_limiter
extension explicitly by receivers using httpconfig. Applying only this approach to other receivers would complicate the user configuration interface as they would have to explicitly connect every receiver with a memory_limiter
extension.
Proposed solution
We can introduce an option to configure the memory_limiter
extension in a way that it's applied to every receiver. Particular receivers can override the default memory limiter and connect to a different one.
Configuration example:
extensions:
memory_limiter:
limit_mib: 800
apply_to_all_receivers: true
memory_limiter/internal_metrics:
limit_mib: 1000
receivers:
otlp:
protocols:
grpc:
endpoint: localhost:4317
http:
endpoint: localhost:4318
prometheus:
memory_limiter: memory_limiter/internal_metrics
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ["localhost:8888"]
processors:
batch:
exporters:
debug:
verbosity: detailed
service:
extensions: [memory_limiter, memory_limiter/internal_metrics]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [debug]
metrics:
receivers: [otlp, prometheus]
processors: [batch]
exporters: [debug]
In this configuration, the default memory_limiter
extension is implicitly applied to otlp
receiver, and the additional memory_limiter/internal_metrics
extension is applied explicitly to the prometheus receiver.
The user cannot define more than one memory_limiter
extension with apply_to_all_receivers: true
; the config validation will fail.
If memory_limiter
extension with apply_to_all_receivers: true
is defined, all the receivers used in the pipelines must support the memory_limiter
extension. Otherwise, the collector won't start.
Proof of concept: https://github.com/open-telemetry/opentelemetry-collector/pull/9590
@dmitryax should this functionality exist in the receiverhelper, similar to how batching is being added to the exporterhelper?
@TylerHelmuth, yes, we can introduce a receiver helper. It doesn't exist at this point. It may be a cleaner implementation for the memory_limiter. If we think it can help with other problems in the future, we can add it. Otherwise, it can be also an unnecessary user interface complication. I'll look into that
First, I would like us to agree on the user interface and the way how the memory_extension can be enabled for all receivers
Instead of needing apply_to_all_receivers: true
, could the extension component named memory_limiter
be applied to all unless the component specifies a different ID?
Pros:
- more backwards compatible
- users wouldnt need to remember to set the boolean to get the feature
- Less config
Cons:
- config is less self-describing
more backwards compatible
How is it more backward compatible? The extension is no-op at this point
How do you know which one is applied to all receives if I have let's say two memory_limiter extensions defined? Looking at each receiver at filtering out one that is not referenced by any of them? This seems like too much magic.
I have a pending proposal for supporting interceptors (https://github.com/open-telemetry/opentelemetry-collector/issues/7441) but I've failed to dedicate engineering time for a PoC. While I'm not too tied to that specific proposal, I think that this here could be split into two parts: one more generic being part of the Collector first, and one with the memory limiter being one implementation.
How is it more backward compatible? The extension is no-op at this point
It is closer to what users do now with the processor, so changing it to an extension would be a little easier, but not significant.
This seems like too much magic.
Ya probably
one more generic being part of the Collector first
@jpkrohling what do you mean by this?
The middleware proposal makes sense to me, but it's not applicable to all receivers, e.g., scraping receivers. The memory_limiter is supposed to be supported by all of them. It's also important to apply the memory limiter before any other middleware for push-based receivers. So I think it's ok to keep them separate.
one more generic being part of the Collector first
Sorry, I mean a general mechanism (interceptors/middleware/...), plus an implementation of it. I agree about the scraping one, though.
but it's not applicable to all receivers, e.g., scraping receivers
Stupid question, but isn't a memory limiter being used in a scraping receiver similar in usage to an exporter, in the sense that they both could intercept an outgoing HTTP/gRPC request? Perhaps there could be two interceptor interfaces, one for servers (typical receivers) and one for clients (scraping receivers and typical exporters)?
Stupid question, but isn't a memory limiter being used in a scraping receiver similar in usage to an exporter, in the sense that they both could intercept an outgoing HTTP/gRPC request? Perhaps there could be two interceptor interfaces, one for servers (typical receivers) and one for clients (scraping receivers and typical exporters)?
Scraping doesn't always involve making HTTP/gRPC requests to an external endpoint. They can read logs or metrics from local files. They can also use some 3rd-party libraries talking to external API, so it'll be hard to apply interceptors.
I think scraping receivers need to skip scrapes on the application level once the memory limit is reached and avoid even making attempts for any external calls.
Hey @dmitryax @timannguyen thanks for all your work to create this memorylimiterextension
and to make it useable by receivers. I just wanted to check in and see if there's anything I can do to help make this memorylimiterextension useable?
Our team is struggling with our service reliability because of memory/latency issues due to large requests being accepted by the receiver (despite using in flight bytes memory limiter we are applying in a custom processor). We want to use the core collector memorylimiterextension or create a custom extension to apply our limiter at the start of our pipeline. We use an otelarrow receiver which sets up a GRPC server and seems that we could make use of configgrpc
and interceptors to apply the memorylimiterextension
.
- Is there anything blocking PR from being merged? https://github.com/open-telemetry/opentelemetry-collector/pull/9397
- May I implement something similar for
configgrpc
so our gRPC servers can leverage memory_limiter extensions?
Hi @moh-osman3, thank you for the interest. I'll be happy to work on this together.
https://github.com/open-telemetry/opentelemetry-collector/pull/9673 and https://github.com/open-telemetry/opentelemetry-collector/pull/9397 add a configuration option right in the httpconfig
and grpcconfig
. It's a good option if users want to enable memory_limiter explicitly. However, I believe it's important to provide a way to apply a memory_limiter
extension to all the receivers used in the collector pipelines, as proposed in this issue.
I tried to prototype this approach in https://github.com/open-telemetry/opentelemetry-collector/pull/9590, and it seems like it doesn't work with having memory_limiter
option right in httpconfig
and grpcconfig
configs. Likely, every receiver has to have this option added separately in their config. That's why I'm hesitant to merge these PRs to avoid breaking them down the road. If you can find a way to merry httpconfig
and grpcconfig
config with the approach proposed in this PR, I'll be happy to move it forward.
I tried to prototype this approach in https://github.com/open-telemetry/opentelemetry-collector/pull/9590, and it seems like it doesn't work with having memory_limiter option right in httpconfig and grpcconfig configs.
@dmitryax Hmm I've checked out your branch and ran this code. It seems to me that things are working with configgrpc/confighttp i.e. the interceptor is correctly applying the overriding memorylimiterextension you configure in the otlpreceiver, but the issue is the global memory limiter being added in
service:
extensions: [memory_limiter, memory_limiter/internal_metrics]
will cause the global memorylimiterextension to start up and apply garbage collection if exceeding the global memorylimiter regardless of what is going on in the interceptors for the otlpreceiver. This will happen if the global memorylimiterextension has a stricter memory limit than the overriding extension.
i.e. memorylimiter.Start() -> memorylimiter.CheckMemLimits() -> ml.doGCAndReadMemStats()
This seems to be an issue with using the memorylimiter processor logic directly. Is garbage collection even necessary in the extension itself? i.e. we just want to monitor the current memory usage and reject/retry based on if the next request will push us over the limit. I think each receiver would then have to check if there is a global memorylimiterextension and apply it. Otherwise I'm not sure if it's possible we can have multiple memorylimiter extensions
I'm working on implementing a custom memorylimiterextension that uses a weighted semaphore of size limit_mib
to control admission into the receiver. For this custom memorylimiterextension, MustRefuse()
will be based on whether sem.TryAcquire() succeeds based on the size of the request. I think this removes the need for garbage collection in the original memorylimiter processor logic and will allow us to successfully override the global memorylimiterextension.
Any thoughts on this?
@moh-osman3, thanks for your interest.
I don't see a lot of value in the GC capability of the extension. It was just copied from the processor as is. I think users should be good just with using GOMEMLIMIT instead. I think we can update the extension just to provide the MustRefuse()
and remove the GC capability while keeping it in the processor to preserve the existing behavior of a popular component.
That will simplify further implementation of the extension. It can be a good first step. It'd be great if you can kelp with a PR.
Another option is to apply GC only if apply_to_all_receivers: true
but I'm not sure if that is important enough.
I think users should be good just with using GOMEMLIMIT instead
@dmitryax Can you expand on what you mean here? Are you saying that GOMEMLIMIT should replace the global memorylimitextension? So it should be set as an overall limit for the entire collector binary, while the memorylimiterextension will be a more local memory limit explicitly configured in the component itself?
Happy to help with a PR to remove the garbage collection in the extension. Related to this - something that seems troubling about the memorylimiter is that it seems possible for limits to exceed the configured limit_mib which was why garbage collection was triggered in the processor. Is it possible to change the function signature of MustRefuse to also size the request and take the request size into consideration when refusing/accepting the requests?
i.e. change from func (ml *memorylimiterextension) MustRefuse() bool
to
func (ml *memorylimiterextension) MustRefuse(req any) bool
?
@dmitryax I'm still wondering if we can have multiple modes of memorylimiting.
- The existing one based on polling the runtime statistics to see if we are above the configured memory limits.
- One based on per-item memory limiting that sizes the request and rejects if the next request puts us over the limit.
Relying on method (1) seems like it might be costly to continuously call runtime.ReadMemStats()
, as well as accepting large or frequent requests that OOM/stall your applications if the poll interval is not sufficiently small. The smaller the poll interval implies the more expensive the memorylimiterextension is due to an increase in runtime.ReadMemStats()
calls. Method (2) will allow the server to know the size of every request before admitting it into the receiver, and can even block until there is enough available memory to process the request (though the number of waiters should probably be limited). This seems like it will make the server more resilient and allow more flexibility for users to configure a memory_limit_strategy
that suits their needs. Users can still set GOMEMLIMIT to limit their collectors globally in both strategis.
I need some help here to figure out an interface that would allow for the use of multiple strategies. It seems like the current interface of
type memorylimiterextension interface {
Start(ctx context.Context, host component.Host) error
Shutdown(ctx context.Context) error
MustRefuse() bool
}
won't work for strategy (2) because we need to supply more information from the interceptor to account for request size in our decision to accept/refuse the request, and we need to store a running count of memory usage (and decrement this count when the handler returns).
Maybe we can embed a more general interface for grpc servers to supply their own interceptors based on the memorylimit strategy.
type grpcServerInterceptor interface {
UnaryInterceptorGenerator()
StreamInterceptorGenerator()
}
This would simplify things in configgrpc
if gss.MemoryLimiter != nil {
memoryLimiter, err := getMemoryLimiterExtension(gss.MemoryLimiter, host.GetExtensions())
if err != nil {
return nil, err
}
uInterceptors = append(uInterceptors, memoryLimiter.UnaryInterceptorGenerator())
sInterceptors = append(sInterceptors, memoryLimiter.StreamInterceptorGenerator())
}
My goal here is not to overcomplicate things but rather provide more flexibility to solving memory related incidents we've experienced in production. I would love any suggestions about how we can extend this memorylimiterextension to support strategy (2) in the future, or if this would need to be a separate component altogether.
Wanting to keep this thread alive-- I've caught up with all the discussion and feel we are heading in the right direction.
I definitely support @moh-osman3's point that the memory limiter should have access to something transport-specific in order to intercept the data as it performs the limiting function. I'm not sure that an (data any)
formal parameter is the right way, however. Perhaps we can find a way to bind transport-specific memory limiter functions, to align existing work on HTTP and gRPC interceptors with other transports, protocols, and sources of telemetry data. For example, if we are limiting a component that reads local files, an interceptor is likely to look like io.Reader
, so the memory limiter will want to intercept Read([]byte)
. We will have memory limiter interfaces based on the type of transport, and each memory limiter extension will want to implement all the interfaces it can.
@dmitryax Regarding this apply_to_all_receivers: true
suggestion, I wonder if the top-level service
configuration ought to contain a global memory_limiter
configuration, as the PRs linked above do for configgrpc
and confighttp
? I mean to suggest that the configuration logic that applies memory limiters would first consult the relevant component-level memory_limiter
(which may or may not be embedded in a transport-common config struct, such as configgrpc
or confighttp
), and if no component-level limiter is configured, then it would consult the top-level service::memory_limiter
.
Thanks, all!
@moh-osman3 I understand the problem of the existing implementation you're trying to solve. Checking the current memory utilization periodically is not ideal. It's pretty hard to configure spike_limit_mib
that always covers the maximum possible memory jump between the measures. I'm up for finding a solution for that. I don't think we need different modes/extensions. We have the same goal. I like @jmacd's suggestion of tracking it closer to the wire. However, I have a couple of concerns about the memory_limiter tracking the memory utilization by itself:
- The measures from the input will significantly differ from the actual memory utilization down the pipeline, especially for non-otlp/grpc receivers, where we have to create the pdata and keep it along with the request until it's done.
- It won't work with any components that handle data asynchronously, e.g., tailsampling processor, groupbytrace processor, deltatocumulative processor, etc. Most importantly, it won't work with queue on exporter size. Users would have to pick a value for the memory limiter by subtracting all the configured queue sizes. Not very convenient.
I think we can try to find some middle ground here. I'd like to keep the simplicity of the current approach and make it more reliable against the spikes. Even if we go with taking the request size as input for the decision for refusing, we can track it only between the runtime calls which will be still used as the source of truth and "reset" the tracker.