New component: Enrichment Processor
The purpose and use-cases of the new component
This issue is a follow up to a presentation regarding enhancing enrichment capabilities of the Collector on the Collector SIG meeting (July 23rd). The feedback was to create an issue to get the discussion going, so here it is:
The OpenTelemetry Collector currently supports limited enrichment types, mostly focusing on self-contained parsing and contextual metadata. To improve the versatility of the Collector in comparison to other data collectors and transformation tools, we should expand its capabilities to include other enrichment types.
The original document “Enrichment in OTel Collector” introduces a taxonomy of enrichment types and its support in the Collector: • Type 1: Self-Contained Parsing & Derivation (supported) • Type 2: Reference Data Lookup (Static or Semi-Static) (very limited support) • Type 3: Dynamic External Enrichment (Live Lookups) (not supported) • Type 4: Contextual Metadata Enrichment (supported) • Type 5: Cross-Event Correlation & Aggregation (not supported) • Type 6: Analytical & ML-Based Enrichment (not supported)
Of this list, looking at similar tools (comparison can be seen in the original document), type 2 and type 3 are the strongest candidates to include in the Collector to facilitate migration of workloads to the Collector from other tools.
From this problem statement we could consider introducing a Lookup Processor to aimed at handling both static reference data lookups (Type 2) and dynamic external enrichments (Type 3).
The processor would support:
- Local lookups: Using static or semi-static data sources such as CSV, JSON, or inline key/value pairs.
- Remote lookups: Dynamic enrichment from APIs, DNS, databases, or cache systems like Redis or Memcached.
Lookups should be done according to the scope that they're configured for depending on the location of the attribute used for the lookups. The configuration below shows examples of resource and log level lookups.
Example configuration for the component
processors:
# YAML source - map service.name to display name
lookup/yaml:
source:
type: yaml
path: ./mappings.yaml
attributes:
- key: service.display_name
from_attribute: service.name
default: "Unknown Service"
action: upsert
context: resource
# HTTP source - lookup user name from API
lookup/http:
source:
type: http
url: "http://localhost:8080/users/{key}"
method: GET
timeout: 5s
response_field: "name"
cache:
enabled: true
size: 1000
ttl: 5m
attributes:
- key: user.name
from_attribute: user.id
context: log
action: upsert
# DNS source - reverse DNS lookup for client IP
lookup/dns:
source:
type: dns
record_type: PTR
timeout: 2s
server: "8.8.8.8:53"
cache:
enabled: true
size: 1000
ttl: 5m
attributes:
- key: client.hostname
from_attribute: client.ip
default: "unknown"
action: upsert
context: log
Implementing such a processor requires significant considerations to abide by the long term vision, namely: • Progressive implementation, starting with basic local and then remote lookup capabilities. • Modular structure facilitating easy addition of new lookup sources. • Built-in caching and timeout mechanisms for performance optimization. • Comprehensive and useful observability metrics (success/failure rates, latency percentiles).
This is not an exhaustive list of concerns and neither does it provide solutions, just an acknowledgment that these will have to be addressed.
Telemetry data types supported
Logs first given that enrichment is a more common logging use case requirement. This could then be extended to Metrics, Traces, and Profiles.
Code Owner(s)
@jsvd, @VihasMakwana
Sponsor (optional)
No response
Additional context
Similar ideas and alternative solutions have been suggested in the past:
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20888
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18526
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29627
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40526
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40936
- https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/34398
Trade Offs Considered
New component vs resourcedetectionprocessor
Conclusion: new component
While there are similarities to the resourcedetectionprocessor, performing lookups to a remote source is not in scope for resource detection which, according to its README, is to detect resource information from the host, while this processor would look to data that is not necessarily either present in the host or about the host.
Also these remote lookups introduce enough performance concerns that are better addressed with a separate implementation such as supporting configurable caching.
Extensibility method - extensions vs NewFactoryWithOptions
Conclusion: NewFactoryWithOptions
This processor should support third-party lookup sources. This could either be implemented by collector extensions or with an extensible factory using NewFactoryWithOptions (like filterprocessor and transformprocessors).
For now the decision is to use NewFactoryWithOptions not not extensions, as extensions in the collector should be reusable by any component, and there are no current use cases requiring lookups during a component's execution, like the storageextension, that can't be replaced by using a lookupprocessor before or after said component.
Signal Support
Conclusion: Logs first, others after
The decision is to support Log signal first, the signal that most often needs such enrichment, and then extend to others as the processor matures.
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.
I definitely support this concept, but do think the implementation details are important. I think in the interest of maximizing flexibility, what makes the most sense to me is to create a new extension type, and then a generic processor to utilize them.
- Allows for greatest flexibility in lookup logic. If a specific distribution needs their own lookup logic, they only need to implement an extension and include it in their distribution, it doesn't require a component fork or upstream contribution.
- Minimizes the configuration complexity required within the lookup processor.
- Allows for reuse of lookup logic. While the generic processor is a great base use case, there are certainly independent components that may want to utilize lookup on tighter guardrails or limit the kinds of lookups that can be used.
- Distributes codeownership of lookup logic, so it doesn't need to all be owned by the processor author(s).
Depending on how the conversation progresses I may be interested in sponsoring, and contributing.
The original idea of having the component + its sources in the same location was taken from the resourcedetectionprocessor, with a few arguments in favor:
- reduces the spread of elements needed for the processor to work across the collector code base. Lookup sources will require functionality (e.g. caching, error handling, metric instrumentation, circuit breakers) that should be shared and it'd be easier to access it from the single processor code base
- ownership of sources can be attributed to others that are not the maintainers of the processor, e.g. https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/7589c18/.github/CODEOWNERS#L205-L206*
This said, the ability to customize the set of lookup sources in a collector distro is significant and the extension suggestion solves it perfectly. I am not sure what would be the use cases for reusing the lookup sources logic in other components outside the generic lookup processor but I could see it happening for the local file based lookups?
Lastly, thanks for the very quick feedback and taking interest in it, let's keep talking!
Expanding on the extension idea, I could foresee the implementation look something like this:
Lookup processor:
Accept a single extension, perform one or more lookups, configure an error mode.
type Config struct {
Extension component.ID `mapstructure:"extension"`
Lookups []LookupConfig `mapstructure:"lookups"`
// define if non-missing key errors should terminate the pipeline
ErrorMode lookup.ErrorMode `mapstructure:"error_mode"`
}
type LookupConfig struct {
Field string `mapstructure:"field"`
TargetField string `mapstructure:"target_field"`
// define if missing keys should terminate the pipeline
Optional bool `mapstructure:"optional"`
}
Extensions:
located under "pkg/lookup" create the necessary interfaces for source extensions:
pkg/lookup/source.go:
type Source interface {
Lookup(ctx context.Context, key string) (value any, found bool, err error)
Type() string
Start(ctx context.Context, host component.Host) error
Shutdown(ctx context.Context) error
}
type LookupExtension interface {
Source
}
pkg/lookup/errors.go:
type ErrorMode string
const (
ErrorModeIgnore ErrorMode = "ignore"
ErrorModePropagate ErrorMode = "propagate"
)
We can also have orthogonal but optional behaviors such as caching:
pkg/lookup/cache.go:
type cacheEntry[V any] struct {
key string
value V
expiration time.Time
}
type Cache[V any] struct {
mu sync.RWMutex
maxSize int
ttl time.Duration
entries map[string]*list.Element
lru *list.List
}
func NewCache[V any](maxSize int, ttl time.Duration) *Cache[V] { }
func (c *Cache[V]) Get(key string) (V, bool) { }
func (c *Cache[V]) Set(key string, value V) { .. }
func (c *Cache[V]) Clear() { .. }
Example extension (memcache)
In extension/lookupsource/memcachedlookupextension/extension.go:
const TypeStr = "memcached"
type memcachedExtension struct {
*lookup.BaseSource
config *Config
client memcachedClient
}
func NewMemcachedExtension(config *Config, settings component.TelemetrySettings) (*memcachedExtension, error) {
// BaseSource with caching
base, err := lookup.NewBaseSource(TypeStr, settings, &config.Cache)
if err != nil {
return nil, err
}
return &memcachedExtension{
BaseSource: base,
config: config,
}, nil
}
User experience
The configuration would look like:
extensions:
memcachedlookup:
servers: ["localhost:11211"]
cache:
enabled: true
size: 100
ttl: 30s
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
lookup/memcached:
source:
extension: memcachedlookup
attributes:
- key: cached.user
from_attribute: user_id
mode: upsert
- key: cached.data
from_attribute: lookup_key
mode: upsert
error_mode: ignore
exporters:
debug:
verbosity: detailed
service:
extensions: [memcachedlookup]
pipelines:
traces:
receivers: [otlp]
processors: [lookup/memcached]
exporters: [debug]
@dehaansa 👋 hi, I'm back from vacations, just wondering if you still had thoughts on this initiative and interest in collaborating. Happy to chat over community slack too!
For anyone landing here with an interest in collaboration or sponsorship, I'm happy to push out a first skeleton PR with some interface definitions if it helps with better understanding the proposal.
@jsvd I've read the proposal and I understand the motivation to propose an extension interface for looking up mappings from various sources.
I began to work on a style-guide for extension APIs, since I found from reviews e.g., https://github.com/open-telemetry/opentelemetry-collector/pull/13265 that there were undocumented rules about this process.
Here's the draft RFC on this topic. https://github.com/open-telemetry/opentelemetry-collector/pull/13263 I will be glad to sponsor your work on the extensions in the core repo where I think the new extension will be defined.
@jmacd this is very interesting, I mostly followed existing extensions but I definitely felt the need for a well defined process, guide and style. I will review this proposal and the chunks of code I wrote against the current status of the draft RFC.
@jmacd the initial design was for the lookup source extensions to live in contrib given their sole consumer would be the processor from this proposal. If you want a practical example of the Functional Interface proposal in action I can submit a draft skeleton PR to this repo, the relevant files being:
Lookup Source extension definition
pkg/lookup/source.go: defines the stable lookup interfaces (Source, LookupExtension) and their functional constructorspkg/lookup/base.go: convenience layer for extension authors, wraps fetch functions while exposing the same function-type surface (TypeFunc, WrapLookup*) expected by the constructors.
Sample extension
extension/lookup/nooplookupextension/extension.go: sample lookup extension implemented via lookup.NewLookupExtension.
Processor
processor/lookupprocessor/factory.go: processor scaffold that calls NewSource/NewLookupExtension helpers and delegates lookup to configured extensions processor skeleton.processor/lookupprocessor/processor.go: empty signal handlers
@jmacd I created a draft PR https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/43120 in case you're interested in seeing an attempt at applying the functional interface RFC to a component (regardless of sponsorship and commitment to reviews).
@jsvd I will try to apply the functional interface to a component(ex. DNS lookup). Is there any other active work going on here? This thread seems to be inactive for more than a month.
I will try to apply the functional interface to a component(ex. DNS lookup)
Not sure I understand. The application in this PR was a way to collaborate with the creator of the funcional interface proposal, but I'm not getting a response.
Is there any other active work going on here?
From the feedback of the last SIG, I'm currently adapting the skeleton PR to use the entity model for enrichments. Otherwise still actively looking for sponsorship as it is still a highly demanded feature therefore the proposal is still, IMO, valid.
I'm also interested. Happy if there is anything i can help for this.
The proposal has been updated according to the latest discussions, including a section at the bottom for Trade-Offs Considered. Also, @VihasMakwana has been added as a second code owner.
We are still looking for a third code owner/sponsor. As per the new component contribution rules the sponsor is not required to become a code owner, but in that case we still need a third code owner.
If you are interested in the feature itself please confirm that it meets your needs, happy to discuss it here or over at the CNCF slack!
I haven't fully re-reviewed the updates after latest discussion, but I'm interested in being a sponsor & codeowner. Will review later today for any outstanding feedback!
@dehaansa I've gone ahead and added you as sponsor & code owner in the description. Please report back when you're happy to proceed.
@kyo-ke thank you for your interest as well! can you confirm that the proposal works for your needs? what sources for lookups would you be most interested in?
@VihasMakwana @dehaansa to facilitate discussion I added a first skeleton PR (draft mode) that shows what the overall experience would look like, including necessary public APIs to build new internal and external lookup sources.
Reviewed the skeleton and the rest of the issue context, 👍 to move forward.
@dehaansa thanks for the quick feedback!! how do you prefer development to be done from now on? I can either make the existing PR ready to merge as-is and once it's merged open another PR on top of main, or push a second commit to this PR with the processor implementation.
@jsvd Happy to see this approved! I'm interested in contributing to the DNS lookup source.
@jagan2221 sweet, thanks for the interest! as I was waiting for a sponsor I ended up writing a lot of code, which includes the dns lookups as a source, which was the only other source I implemented besides YAML source and a testing noop source. I will push the code up soon for review and would love your feedback on the sorts of dns queries you need, for now my implementation only does PTR:
# DNS lookup: client.ip -> host.name (log-level)
lookup/dns:
source:
type: dns
record_type: PTR
timeout: 5s
server: "8.8.8.8:53" # optional
cache:
enabled: true
size: 1000
ttl: 5m
negative_ttl: 1m
attributes:
- key: host.name
from_attribute: client.ip
default: unknown
action: upsert
context: log
I'm guessing A and AAAA queries will also be common use cases and we'll need to support multiple responses. PTR can also return more than 1 result, but much less common.
That's great! Yeah, We would need A and AAAA resolutions too. Will take a look once you push it for review and will try to add A and AAAA lookups too. Also please let me know anything which i can collaborate to speed up the implementation.