watermill
watermill copied to clipboard
can watermill kafka client support sarama.NewAsyncProducer() ?
Feature request
Description
From the source code, Kafka's Publisher uses sarama-NewSyncProducer and does not support Asynchronous, which can cause performance bottlenecks.
sourcecode
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*Publisher, error) {
config.setDefaults()
if err := config.Validate(); err != nil {
return nil, err
}
if logger == nil {
logger = watermill.NopLogger{}
}
producer, err := sarama.NewSyncProducer(config.Brokers, config.OverwriteSaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "cannot create Kafka producer")
}
if config.OTELEnabled && config.Tracer == nil {
config.Tracer = NewOTELSaramaTracer()
}
if config.Tracer != nil {
producer = config.Tracer.WrapSyncProducer(config.OverwriteSaramaConfig, producer)
}
return &Publisher{
config: config,
producer: producer,
logger: logger,
}, nil
}
This could be introduced behind a config flag.
Hey hi @m110 can I take this issue, I would try completing in 2 weeks as I am busy now but I want to have a try
@manikantanynala97 makes sense, please take a look at https://github.com/ThreeDotsLabs/watermill-kafka/pull/19/files if relevant.