watermill icon indicating copy to clipboard operation
watermill copied to clipboard

can watermill kafka client support sarama.NewAsyncProducer() ?

Open Jaylenwa opened this issue 8 months ago • 3 comments

Feature request

Description

From the source code, Kafka's Publisher uses sarama-NewSyncProducer and does not support Asynchronous, which can cause performance bottlenecks.

sourcecode

// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
	config PublisherConfig,
	logger watermill.LoggerAdapter,
) (*Publisher, error) {
	config.setDefaults()

	if err := config.Validate(); err != nil {
		return nil, err
	}

	if logger == nil {
		logger = watermill.NopLogger{}
	}

	producer, err := sarama.NewSyncProducer(config.Brokers, config.OverwriteSaramaConfig)
	if err != nil {
		return nil, errors.Wrap(err, "cannot create Kafka producer")
	}

	if config.OTELEnabled && config.Tracer == nil {
		config.Tracer = NewOTELSaramaTracer()
	}

	if config.Tracer != nil {
		producer = config.Tracer.WrapSyncProducer(config.OverwriteSaramaConfig, producer)
	}

	return &Publisher{
		config:   config,
		producer: producer,
		logger:   logger,
	}, nil
}

Jaylenwa avatar Jul 04 '25 09:07 Jaylenwa

This could be introduced behind a config flag.

m110 avatar Aug 20 '25 11:08 m110

Hey hi @m110 can I take this issue, I would try completing in 2 weeks as I am busy now but I want to have a try

manikantanynala97 avatar Aug 27 '25 01:08 manikantanynala97

@manikantanynala97 makes sense, please take a look at https://github.com/ThreeDotsLabs/watermill-kafka/pull/19/files if relevant.

m110 avatar Aug 27 '25 12:08 m110