syncProducer hangs forever when Idempotent is enabled
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
| Sarama | Kafka | Go |
|---|---|---|
| 1.27.2 | kafka_2.12-2.5.1.jar | 1.14 |
Configuration
What configuration values are you using for Sarama and Kafka?
func newSaramaConfig() *sarama.Config {
saramaConfig := sarama.NewConfig()
// need to specify the kafka's version, otherwise it will
// use oldest compatible one(0.8.2) by default, then some kafka
// api is not called with correct version. e.g. GetOffset will
// only get approximate value
saramaConfig.Version = sarama.V2_3_0_0
saramaConfig.ChannelBufferSize = CHANNEL_BUFFER_SIZE
// we tell the producer that we are going to partition ourselves.
saramaConfig.Producer.Partitioner = sarama.NewManualPartitioner
// SyncProducer require those two return config enabled
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Return.Errors = true
// waits for all in-sync replicas to commit before responding.
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
// change the producer retry to 10 times in case network issues
saramaConfig.Producer.Retry.Max = 10
saramaConfig.Producer.Idempotent = true
// Producer.MaxMessageBytes must be smaller than MaxRequestSize
saramaConfig.Producer.MaxMessageBytes = max_msg_byte - 1
saramaConfig.Metadata.Retry.Max = sarama_metadata_retrymax
saramaConfig.Metadata.RefreshFrequency = sarama_metadata_refreshfrequenc
saramaConfig.Metadata.Retry.Backoff = sarama_metadata_refreshinterval
return saramaConfig
}
Logs
When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.
logs: CLICK ME
Problem Description
When using the syncProducer to produce event with Idempotent enabled, the client might hang at producing forever. After I did some investigation with dlv, some grs hangs in ProduceMessages of sarama. Finally, I found there is an obvious bug in async_producer.go: retryBatch()
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
produceSet := newProduceSet(p)
produceSet.msgs[topic] = make(map[int32]*partitionSet)
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, kerr)
return
}
msg.retries++
}
// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
}
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, kerr)
return
}
msg.retries++
}
The first loop just return immediately for the condition "if msg.retries >= p.conf.Producer.Retry.Max", so other messages in the pSet might not be able to returnError, and the syncProducer wait the expectation channel for each message. So it hangs there forever.
pls check
any updates on this issue? Thanks!
any updates? tks
any updates? tks
https://github.com/Shopify/sarama/pull/2378 seems to be the solution? Should it be closed?
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
Believed to be fixed by https://github.com/IBM/sarama/pull/2378