goka icon indicating copy to clipboard operation
goka copied to clipboard

Allow wrapping Sarama producer/consumer for OpenTelemetry support

Open eko opened this issue 2 years ago • 5 comments

Hi @frairon,

I would like to implement OpenTelemetry support with Goka.

F25D98CE-6B67-4A9A-8C5F-0118EC2D75C7

I've seen that some discussions have already started on issue https://github.com/lovoo/goka/issues/160 some time ago.

In order to do this, I would like to use the opentelemetry-go-contrib/Shopify/sarama instrumentation library because Goka is using it 🙂

How it works

In order to implement it, I need to be able to wrap the Sarama producer / consumer so I've added some ProcessorOptions which works like this and does not modify the actual public signatures:

opts := []goka.ProcessorOption{
	goka.WithSaramaProducerWrapper(func(producer sarama.AsyncProducer, config *sarama.Config) sarama.AsyncProducer {
		return otelsarama.WrapAsyncProducer(config, producer)
	}),

	goka.WithSaramaConsumerGroupHandlerWrapper(func(consumerGroupHandler sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler {
		return otelsarama.WrapConsumerGroupHandler(consumerGroupHandler)
	}),

	goka.WithSaramaConsumerWrapper(func(consumer sarama.Consumer) sarama.Consumer {
		return otelsarama.WrapConsumer(consumer)
	}),
}

p, err := goka.NewProcessor([]string{"localhost:9092"}, group, opts...)

Side notes

As the Sarama OpenTelemetry implementation is using the msg.Metadata field to store its tracing span information and Goka was using it too, I've updated the Goka usage to don't use it because it was creating some issues.

Instead of using the Metadata field in Goka to store the Promise object, I've created a sync.Map and the promise is retrieved using the message pointer as key.

Feedbacks

I would really appreciate your feedbacks about these changes and hope to see the OpenTelemetry available very soon in Goka ✌️

Thank you!

eko avatar Oct 19 '21 16:10 eko

Hi @eko, first, thanks a lot for your work so far!! It feels good to see other people using goka and even trying to improve it.

Generally I'd say we're open to integrating new frameworks and OpenTelemetry seems promising. However I have the feeling we could improve the wrapping of the builders to a less invasive way. I think changing the builder-API to add a wrapper-argument has some downsides:

  • all existing custom builders need to be adapted
  • we add the concept of wrappers to all builders
  • we actually limit to one wrapper, wrapping multiple times is not supported.

So here are two alternatives:

Alternative 1: leave goka out of the wrapping-concept.

Here the idea is, that the wrapping-responsibility is shifted to the user. A processor, for instance, is created by adding a Builder as an option that does the wrapping internally. That way, we don't have to change any builders or any options.

Alternative 2: create wrapping builder

We could add custom builders that wrap the things created by the underlying builder. They will replace the builder which find in the options when applied.

// helper builder that takes a wrapper and a builder
type saramaConsumerWrappingBuilder struct {
	childWrapper SaramaConsumerBuilder
	wrapper ConsumerWrapper
}

// builder function that wraps the result of the child-wrapper
func (scw *saramaConsumerWrappingBuilder) build(brokers []string, clientID string) (sarama.Consumer, error) {
	cons, err := scw.childWrapper(brokers, clientID)

	if err != nil {
		return nil, err
	}

	return scw.wrapper(cons), nil
}


func WithSaramaConsumerWrapper(consumerWrapper ConsumerWrapper) ProcessorOption {
	return func(o *poptions, gg *GroupGraph) {

		if o.builders.consumerSarama == nil {
			panic("cannot wrap non existent builder")
		}
		// put the existing builder into the new builder
		wrappingBuilder := &saramaConsumerWrappingBuilder{
			childWrapper: o.builders.consumerSarama,
			wrapper: consumerWrapper,
		}

		// replace the current builder by the wrapper
		o.builders.consumerSarama = wrappingBuilder.build
	}
}

// use it like this:
goka.NewProcessor(...
	goka.WithSaramaConsumerWrapper(myWrapper()),
	// allows to wrap multiple times, because goka's not aware that anything is wrapped.
	goka.WithSaramaConsumerWrapper(anotherWrapper()),
)

That however requires, that we change the option-appliers at options.go to set the defaults first and then apply the custom options. It also means that potential custom builders need to be added prior to the wrapper options, otherwise they get overwritten. However that's a limitation we can probably live with.


Although the second alternative looks more elegant, It still seems quite arbitrary to add such specific functionality to the very generic concept of those builders.

As a compromise, we could also go with Alternative-1 (not changing any options), still providing a helper that allows to wrap Builders. Like this:


goka.NewProcessor(...,
	goka.WithSaramaConsumerBuilder(goka.WrappingBuilder(myWrapper(), goka.DefaultSaramaConsumerBuilder)),
	// or 
	goka.WithSaramaConsumerBuilder(goka.WrappingBuilder(myWrapper(), MyCustomBuilder())),
),

What's your opinion on that?

I haven't checked the promise-map implementation yet, will do that the next days :)

Thanks again!

frairon avatar Oct 23 '21 19:10 frairon

Thank you for this complete answer Franz! I have no doubt we are on the good way to find a great way to allow this implementation with your comments :)

I understand your points and I think Alternative 2 you proposed could work and will also allow wrapping multiple times.

I would preferred to go with Alternative 1 and directly use the With*Builder() already available options. This will mainly ensure that the signatures are not changed.

However, I have some things I would like to highlight. I have 3 items I would need to "wrap" for this implementation and some have some specific needs:

ConsumerBuilder

It will work well with the ConsumerBuilder because it returns a sarama.Consumer instance so I could already wrap it:

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

sarama.ConsumerGroupHandler (*Processor)

This item is the *Processor itself but the otelsarama implementation needs returns a sarama.ConsumerGroupHandler (https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/consumer_group.go#L43) so something like this does not work:

processor, err := goka.NewProcessor(pg.bootstrapServers, g, opts...)

processor = func(processor *goka.Processor) *goka.Processor {
	return otelsarama.WrapConsumerGroupHandler(processor, tracerProvider).(*goka.Processor)
}(processor)

The WithSaramaConsumerGroupHandlerWrapper() option I added only stores a wrapper and does not modify any signature. It will just do the wrapping just before giving the processor to the sarama.Consume() method.

I don't have any other idea for this one at the moment.

ProducerBuilder

I have a little issue with this one because the ProduceBuilder actually returns a Producer interface:

type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32, producerWrapper ProducerWrapper) (Producer, error)

Do you think we could add another option system using variadics such as NewProducer(brokers []string, config *sarama.Config, options ...ProducerOption) (Producer, error)?

So people doesn't have to re-declare their own Producer implementation? or do you prefer to let people re-declare it?

I will try to give a look on what we could do at the beginning of the week but I would really like to have your advices on these things.

Thanks again!

eko avatar Oct 24 '21 14:10 eko

Here is a little update on current status:

  • I've removed the WithSaramaConsumerWrapper option (not needed, we can already wrap the Sarama Consumer with existing builder)
  • I've moved the WithSaramaProducerWrapper option to Producer options (described in the line below)
  • I've added a producer options system to NewProducer(brokers []string, config *sarama.Config, options ...ProducerOption) (Producer, error) to allow wrapping the Sarama Producer

Concerning multiple wrapping, users can already do it on their side with something like:

goka.WithSaramaConsumerGroupHandlerWrapper(func(consumerGroupHandler sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler {
	return something_else.WrapConsumerGroupHandler(
		any_other_wrapper.WrapConsumerGroupHandler(
			yet_another_one.WrapConsumerGroupHandler(
				consumerGroupHandler,
			),
		),
	)
}),

Anyway, the PR has been lightened thanks to your remarks but feel free to share with me other ideas to continue improving it.

Thanks

eko avatar Oct 28 '21 07:10 eko

@eko , sorry for the long delay. The other error-handling PR really kept me busy for some time. Also very sorry for proposing yet another solution, but I have to :) I thought about this whole wrapping solution and again, it feels like such a specific thing solved in such a generic way that it just doesn't look right. I mean if someone wants opentel support, they just want to add it easily, not apply the opentel wrappers using goka wrappers. Someone will forget to use 3 wrappers in processors and 1 for emitters and views and get errors and and and... you get the point, it is a bit complex. Further, building a generic wrapper infrastructure assumes that there are more that work similar, but we don't have any indication that this is the case or will be. And I'm a bit afraid that it won't work without a lot of hacking to make it work with others.

So what if we do not create any generic wrappers, but provide "tools" that can be used to tweak the components. I tried it in this goka-tools-branch, which would allow you to create your components like this:

// create options builder, optionally specifying different parent builders or sarama config,
// otherwise uses goka default.
otelbuilder := opentel.NewBuilder().WithConfig(...).WithConsumerBuilder(..)

goka.NewProcessor(...,
    otelbuilder.BuildProcOptions()...
)

goka.NewView(...,
    otelbuilder.BuildViewOptions()...
)

goka.NewEmitter(...,
    otelbuilder.BuildEmitterOptions()...
)

That way, goka would not be aware that it's being wrapped, so no major changes needed (except see notes). It's easy to apply, can be modified, and we hide the complex logic e.g. of wrapping the consumer-group-handler, which is actually the processor itself, behind a wrapper. I'd prefer that very much to having a random if g.opts.consumerGroupHandlerWrapper != nil {... in the middle of the rebalance-loop of the processor, which seems quite hacky and arbitrary.

Notes

If we do it that way, we will have to refactor the producer-builder to create the sarama-interface, and processors and emitters build their own wrapping Producer that handles the promises etc.

Since the opentel-Producer-Wrapper seems to back up the metadata, we should be able to continue using this approach and avoid the sync-map. Otherwise it's a bug in the opentel-support which we should fix.

I know I said earlier to avoid touching the builder-interfaces. But seeing the producer-builder like this seems super inconsistent. So we'll have to live with the API change I guess :)

What are your thoughts on that?

frairon avatar Nov 08 '21 21:11 frairon

Hi @frairon,

Thank you for working on this P.o.C.

I think you're right it could be better to provide a unified way to build the OpenTelemetry instrumentation for both producer and consumer / consumer group.

In case of a custom builder implementation users can still implement their own Builder implementation if they have some special cases like multiple wrapping so I think it's okay.

Since the opentel-Producer-Wrapper seems to back up the metadata, we should be able to continue using this approach and avoid the sync-map. Otherwise it's a bug in the opentel-support which we should fix.

Concerning this metadata implementation, I won't say it's a bug but a wrong way to implement it ;-)

You (Goka) and the otelsarama instrumentation both use this field to maintain your state/promise but I think the issue is that you should not use it (if possible, and it is) because this field should be left to the end-user to use it. So I think both implementations have to be fixed.

What are the next step for you to have a first version available?

Thanks again for your time!

eko avatar Nov 14 '21 10:11 eko