opentelemetry-collector-contrib icon indicating copy to clipboard operation
opentelemetry-collector-contrib copied to clipboard

New component: Enrichment Processor

Open jsvd opened this issue 4 months ago • 14 comments

The purpose and use-cases of the new component

This issue is a follow up to a presentation regarding enhancing enrichment capabilities of the Collector on the Collector SIG meeting (July 23rd). The feedback was to create an issue to get the discussion going, so here it is:

The OpenTelemetry Collector currently supports limited enrichment types, mostly focusing on self-contained parsing and contextual metadata. To improve the versatility of the Collector in comparison to other data collectors and transformation tools, we should expand its capabilities to include other enrichment types.

The original document “Enrichment in OTel Collector” introduces a taxonomy of enrichment types and its support in the Collector: • Type 1: Self-Contained Parsing & Derivation (supported) • Type 2: Reference Data Lookup (Static or Semi-Static) (very limited support) • Type 3: Dynamic External Enrichment (Live Lookups) (not supported) • Type 4: Contextual Metadata Enrichment (supported) • Type 5: Cross-Event Correlation & Aggregation (not supported) • Type 6: Analytical & ML-Based Enrichment (not supported)

Of this list, looking at similar tools (comparison can be seen in the original document), type 2 and type 3 are the strongest candidates to include in the Collector to facilitate migration of workloads to the Collector from other tools.

From this problem statement we could consider introducing a Lookup Processor to aimed at handling both static reference data lookups (Type 2) and dynamic external enrichments (Type 3).

The processor would support:

  • Local lookups: Using static or semi-static data sources such as CSV, JSON, or inline key/value pairs.
  • Remote lookups: Dynamic enrichment from APIs, DNS, databases, or cache systems like Redis or Memcached.

Lookups should be done according to the scope that they're configured for depending on the location of the attribute used for the lookups. The configuration below shows examples of resource and log level lookups.

Example configuration for the component

processors:
  # YAML source - map service.name to display name
  lookup/yaml:
    source:
      type: yaml
      path: ./mappings.yaml
    attributes:
      - key: service.display_name
        from_attribute: service.name
        default: "Unknown Service"
        action: upsert
        context: resource

  # HTTP source - lookup user name from API
  lookup/http:
    source:
      type: http
      url: "http://localhost:8080/users/{key}"
      method: GET
      timeout: 5s
      response_field: "name"
      cache:
        enabled: true
        size: 1000
        ttl: 5m
    attributes:
      - key: user.name
        from_attribute: user.id
        context: log
        action: upsert

  # DNS source - reverse DNS lookup for client IP
  lookup/dns:
    source:
      type: dns
      record_type: PTR
      timeout: 2s
      server: "8.8.8.8:53"
      cache:
        enabled: true
        size: 1000
        ttl: 5m
    attributes:
      - key: client.hostname
        from_attribute: client.ip
        default: "unknown"
        action: upsert
        context: log

Implementing such a processor requires significant considerations to abide by the long term vision, namely: • Progressive implementation, starting with basic local and then remote lookup capabilities. • Modular structure facilitating easy addition of new lookup sources. • Built-in caching and timeout mechanisms for performance optimization. • Comprehensive and useful observability metrics (success/failure rates, latency percentiles).

This is not an exhaustive list of concerns and neither does it provide solutions, just an acknowledgment that these will have to be addressed.

Telemetry data types supported

Logs first given that enrichment is a more common logging use case requirement. This could then be extended to Metrics, Traces, and Profiles.

Code Owner(s)

@jsvd, @VihasMakwana

Sponsor (optional)

No response

Additional context

Similar ideas and alternative solutions have been suggested in the past:

  • https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20888
  • https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18526
  • https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29627
  • https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40526
  • https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40936
  • https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/34398

Trade Offs Considered

New component vs resourcedetectionprocessor

Conclusion: new component

While there are similarities to the resourcedetectionprocessor, performing lookups to a remote source is not in scope for resource detection which, according to its README, is to detect resource information from the host, while this processor would look to data that is not necessarily either present in the host or about the host. Also these remote lookups introduce enough performance concerns that are better addressed with a separate implementation such as supporting configurable caching.

Extensibility method - extensions vs NewFactoryWithOptions

Conclusion: NewFactoryWithOptions

This processor should support third-party lookup sources. This could either be implemented by collector extensions or with an extensible factory using NewFactoryWithOptions (like filterprocessor and transformprocessors).

For now the decision is to use NewFactoryWithOptions not not extensions, as extensions in the collector should be reusable by any component, and there are no current use cases requiring lookups during a component's execution, like the storageextension, that can't be replaced by using a lookupprocessor before or after said component.

Signal Support

Conclusion: Logs first, others after

The decision is to support Log signal first, the signal that most often needs such enrichment, and then extend to others as the processor matures.

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

jsvd avatar Aug 06 '25 12:08 jsvd

I definitely support this concept, but do think the implementation details are important. I think in the interest of maximizing flexibility, what makes the most sense to me is to create a new extension type, and then a generic processor to utilize them.

  • Allows for greatest flexibility in lookup logic. If a specific distribution needs their own lookup logic, they only need to implement an extension and include it in their distribution, it doesn't require a component fork or upstream contribution.
  • Minimizes the configuration complexity required within the lookup processor.
  • Allows for reuse of lookup logic. While the generic processor is a great base use case, there are certainly independent components that may want to utilize lookup on tighter guardrails or limit the kinds of lookups that can be used.
  • Distributes codeownership of lookup logic, so it doesn't need to all be owned by the processor author(s).

Depending on how the conversation progresses I may be interested in sponsoring, and contributing.

dehaansa avatar Aug 06 '25 14:08 dehaansa

The original idea of having the component + its sources in the same location was taken from the resourcedetectionprocessor, with a few arguments in favor:

  1. reduces the spread of elements needed for the processor to work across the collector code base. Lookup sources will require functionality (e.g. caching, error handling, metric instrumentation, circuit breakers) that should be shared and it'd be easier to access it from the single processor code base
  2. ownership of sources can be attributed to others that are not the maintainers of the processor, e.g. https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/7589c18/.github/CODEOWNERS#L205-L206*

This said, the ability to customize the set of lookup sources in a collector distro is significant and the extension suggestion solves it perfectly. I am not sure what would be the use cases for reusing the lookup sources logic in other components outside the generic lookup processor but I could see it happening for the local file based lookups?

Lastly, thanks for the very quick feedback and taking interest in it, let's keep talking!

jsvd avatar Aug 06 '25 16:08 jsvd

Expanding on the extension idea, I could foresee the implementation look something like this:

Lookup processor:

Accept a single extension, perform one or more lookups, configure an error mode.

type Config struct {
	Extension component.ID `mapstructure:"extension"`
	Lookups []LookupConfig `mapstructure:"lookups"`
         // define if non-missing key errors should terminate the pipeline
	ErrorMode lookup.ErrorMode `mapstructure:"error_mode"`
}

type LookupConfig struct {
	Field string `mapstructure:"field"`
	TargetField string `mapstructure:"target_field"`
         // define if missing keys should terminate the pipeline
	Optional bool `mapstructure:"optional"`
}

Extensions:

located under "pkg/lookup" create the necessary interfaces for source extensions:

pkg/lookup/source.go:

type Source interface {
	Lookup(ctx context.Context, key string) (value any, found bool, err error)
	Type() string
	Start(ctx context.Context, host component.Host) error
	Shutdown(ctx context.Context) error
}

type LookupExtension interface {
	Source
}

pkg/lookup/errors.go:

type ErrorMode string

const (
	ErrorModeIgnore ErrorMode = "ignore"
	ErrorModePropagate ErrorMode = "propagate"
)

We can also have orthogonal but optional behaviors such as caching:

pkg/lookup/cache.go:

type cacheEntry[V any] struct {
	key        string
	value      V
	expiration time.Time
}

type Cache[V any] struct {
	mu      sync.RWMutex
	maxSize int
	ttl     time.Duration
	entries map[string]*list.Element
	lru     *list.List
}

func NewCache[V any](maxSize int, ttl time.Duration) *Cache[V] { }
func (c *Cache[V]) Get(key string) (V, bool) { }
func (c *Cache[V]) Set(key string, value V) { .. }
func (c *Cache[V]) Clear() { .. }

Example extension (memcache)

In extension/lookupsource/memcachedlookupextension/extension.go:

const TypeStr = "memcached"

type memcachedExtension struct {
	*lookup.BaseSource
	config *Config
	client memcachedClient
}

func NewMemcachedExtension(config *Config, settings component.TelemetrySettings) (*memcachedExtension, error) {
	// BaseSource with caching
	base, err := lookup.NewBaseSource(TypeStr, settings, &config.Cache)
	if err != nil {
		return nil, err
	}

	return &memcachedExtension{
		BaseSource: base,
		config:     config,
	}, nil
}

User experience

The configuration would look like:

extensions:
  memcachedlookup:
    servers: ["localhost:11211"]
    cache:
      enabled: true
      size: 100
      ttl: 30s

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  lookup/memcached:
    source:
      extension: memcachedlookup
    attributes:
      - key: cached.user
        from_attribute: user_id
        mode: upsert
      - key: cached.data
        from_attribute: lookup_key
        mode: upsert
    error_mode: ignore

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [memcachedlookup]
  pipelines:
    traces:
      receivers: [otlp]
      processors: [lookup/memcached]
      exporters: [debug]

jsvd avatar Aug 08 '25 13:08 jsvd

@dehaansa 👋 hi, I'm back from vacations, just wondering if you still had thoughts on this initiative and interest in collaborating. Happy to chat over community slack too!

jsvd avatar Sep 03 '25 10:09 jsvd

For anyone landing here with an interest in collaboration or sponsorship, I'm happy to push out a first skeleton PR with some interface definitions if it helps with better understanding the proposal.

jsvd avatar Sep 22 '25 10:09 jsvd

@jsvd I've read the proposal and I understand the motivation to propose an extension interface for looking up mappings from various sources.

I began to work on a style-guide for extension APIs, since I found from reviews e.g., https://github.com/open-telemetry/opentelemetry-collector/pull/13265 that there were undocumented rules about this process.

Here's the draft RFC on this topic. https://github.com/open-telemetry/opentelemetry-collector/pull/13263 I will be glad to sponsor your work on the extensions in the core repo where I think the new extension will be defined.

jmacd avatar Sep 24 '25 16:09 jmacd

@jmacd this is very interesting, I mostly followed existing extensions but I definitely felt the need for a well defined process, guide and style. I will review this proposal and the chunks of code I wrote against the current status of the draft RFC.

jsvd avatar Sep 24 '25 16:09 jsvd

@jmacd the initial design was for the lookup source extensions to live in contrib given their sole consumer would be the processor from this proposal. If you want a practical example of the Functional Interface proposal in action I can submit a draft skeleton PR to this repo, the relevant files being:

Lookup Source extension definition

  • pkg/lookup/source.go: defines the stable lookup interfaces (Source, LookupExtension) and their functional constructors
  • pkg/lookup/base.go: convenience layer for extension authors, wraps fetch functions while exposing the same function-type surface (TypeFunc, WrapLookup*) expected by the constructors.

Sample extension

  • extension/lookup/nooplookupextension/extension.go: sample lookup extension implemented via lookup.NewLookupExtension.

Processor

  • processor/lookupprocessor/factory.go: processor scaffold that calls NewSource/NewLookupExtension helpers and delegates lookup to configured extensions processor skeleton.
  • processor/lookupprocessor/processor.go: empty signal handlers

jsvd avatar Sep 29 '25 14:09 jsvd

@jmacd I created a draft PR https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/43120 in case you're interested in seeing an attempt at applying the functional interface RFC to a component (regardless of sponsorship and commitment to reviews).

jsvd avatar Oct 03 '25 10:10 jsvd

@jsvd I will try to apply the functional interface to a component(ex. DNS lookup). Is there any other active work going on here? This thread seems to be inactive for more than a month.

jagan2221 avatar Nov 11 '25 05:11 jagan2221

I will try to apply the functional interface to a component(ex. DNS lookup)

Not sure I understand. The application in this PR was a way to collaborate with the creator of the funcional interface proposal, but I'm not getting a response.

Is there any other active work going on here?

From the feedback of the last SIG, I'm currently adapting the skeleton PR to use the entity model for enrichments. Otherwise still actively looking for sponsorship as it is still a highly demanded feature therefore the proposal is still, IMO, valid.

jsvd avatar Nov 11 '25 09:11 jsvd

I'm also interested. Happy if there is anything i can help for this.

kyo-ke avatar Dec 03 '25 02:12 kyo-ke

The proposal has been updated according to the latest discussions, including a section at the bottom for Trade-Offs Considered. Also, @VihasMakwana has been added as a second code owner.

We are still looking for a third code owner/sponsor. As per the new component contribution rules the sponsor is not required to become a code owner, but in that case we still need a third code owner.

If you are interested in the feature itself please confirm that it meets your needs, happy to discuss it here or over at the CNCF slack!

jsvd avatar Dec 09 '25 11:12 jsvd

I haven't fully re-reviewed the updates after latest discussion, but I'm interested in being a sponsor & codeowner. Will review later today for any outstanding feedback!

dehaansa avatar Dec 09 '25 14:12 dehaansa

@dehaansa I've gone ahead and added you as sponsor & code owner in the description. Please report back when you're happy to proceed.

axw avatar Dec 10 '25 05:12 axw

@kyo-ke thank you for your interest as well! can you confirm that the proposal works for your needs? what sources for lookups would you be most interested in?

jsvd avatar Dec 10 '25 11:12 jsvd

@VihasMakwana @dehaansa to facilitate discussion I added a first skeleton PR (draft mode) that shows what the overall experience would look like, including necessary public APIs to build new internal and external lookup sources.

jsvd avatar Dec 10 '25 17:12 jsvd

Reviewed the skeleton and the rest of the issue context, 👍 to move forward.

dehaansa avatar Dec 10 '25 18:12 dehaansa

@dehaansa thanks for the quick feedback!! how do you prefer development to be done from now on? I can either make the existing PR ready to merge as-is and once it's merged open another PR on top of main, or push a second commit to this PR with the processor implementation.

jsvd avatar Dec 11 '25 13:12 jsvd

@jsvd Happy to see this approved! I'm interested in contributing to the DNS lookup source.

jagan2221 avatar Dec 12 '25 08:12 jagan2221

@jagan2221 sweet, thanks for the interest! as I was waiting for a sponsor I ended up writing a lot of code, which includes the dns lookups as a source, which was the only other source I implemented besides YAML source and a testing noop source. I will push the code up soon for review and would love your feedback on the sorts of dns queries you need, for now my implementation only does PTR:

# DNS lookup: client.ip -> host.name (log-level)
lookup/dns:
  source:
    type: dns
    record_type: PTR
    timeout: 5s
    server: "8.8.8.8:53" # optional
    cache:
      enabled: true
      size: 1000
      ttl: 5m
      negative_ttl: 1m
  attributes:
    - key: host.name
      from_attribute: client.ip
      default: unknown
      action: upsert
      context: log

I'm guessing A and AAAA queries will also be common use cases and we'll need to support multiple responses. PTR can also return more than 1 result, but much less common.

jsvd avatar Dec 12 '25 10:12 jsvd

That's great! Yeah, We would need A and AAAA resolutions too. Will take a look once you push it for review and will try to add A and AAAA lookups too. Also please let me know anything which i can collaborate to speed up the implementation.

jagan2221 avatar Dec 12 '25 11:12 jagan2221