sarama
sarama copied to clipboard
Hang on PartitionOffsetManager.Close sequence with AutoCommit.Enable = false
Calls to PartitionOffsetManager.Close
seem to consistently hang for me when Consumer.Offsets.AutoCommit.Enable==false
. They do not hang when the default value of true is used.
Versions
Sarama | Kafka | Go |
---|---|---|
1.35.0 | 3.2.0 | 1.17 |
Configuration
config := sarama.NewConfig()
config.ClientID = "sarama-close-repro"
config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.User = os.Getenv("KAFKA_USER")
config.Net.SASL.Password = os.Getenv("KAFKA_PASSWORD")
config.Consumer.Offsets.AutoCommit.Enable = false // <-- this is critical to reproduce the issue
client, err := sarama.NewClient(brokers, config)
if err != nil {
panic(err)
}
om, err := sarama.NewOffsetManagerFromClient("sarama-close-repro", client)
if err != nil {
panic(err)
}
pom, err := om.ManagePartition(topic, 0)
if err != nil {
panic(err)
}
pom.Close() // <-- hangs here when AutoCommit.Enable = false
om.Close()
client.Close()
Logs
When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger
to a log.Logger
to capture Sarama debug
output.
logs: CLICK ME
[sarama] 2022/07/28 18:44:33 Initializing new client
[sarama] 2022/07/28 18:44:33 client/metadata fetching metadata for all topics from broker <cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 Completed pre-auth SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]
[sarama] 2022/07/28 18:44:34 SASL authentication successful with broker <cluster_domain>:9092:4 - [0 0 0 0]
[sarama] 2022/07/28 18:44:34 Connected to broker at <cluster_domain>:9092 (unregistered)
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #0 at b0-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #5 at b5-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #1 at b1-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #4 at b4-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #2 at b2-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #3 at b3-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 Successfully initialized new client
[sarama] 2022/07/28 18:44:34 client/coordinator requesting coordinator for consumergroup sarama-close-repro from <cluster_domain>:9092
[sarama] 2022/07/28 18:44:35 client/coordinator coordinator for consumergroup sarama-close-repro is #3 (b3-<cluster_domain>:9092)
[sarama] 2022/07/28 18:44:35 Completed pre-auth SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]
[sarama] 2022/07/28 18:44:35 SASL authentication successful with broker b3-<cluster_domain>:9092:4 - [0 0 0 0]
[sarama] 2022/07/28 18:44:35 Connected to broker at b3-<cluster_domain>:9092 (registered as #3)
Problem Description
I'm not sure if this is a problem with the sequencing of calls that I'm making / my usage of the API, or a bug in Sarama, but I don't seem to be able to successfully close a PartitionOffsetManager
what was created with AutoCommit.Enable
== false.
If this is not a known issue / issue with the API usage, I'd be happy to investigate further and try to write a patch.
Note that swapping the ordering of the PartitionOffsetManager.Close
and OffsetManager.Close
calls avoids the hang, however the docs for OffsetManager
say:
You must call this after all the PartitionOffsetManagers are closed.
Is the documentation just incorrect, or does the correct call ordering dependent on the value of Offsets.AutoCommit.Enable
?
Tracing this down a bit more in the Sarama code, it looks like the hang happens here in PartitionOffsetManager.Close
:
func (pom *partitionOffsetManager) Close() error {
pom.AsyncClose()
var errors ConsumerErrors
for err := range pom.errors { // <---- hang happens here
errors = append(errors, err)
}
if len(errors) > 0 {
return errors
}
return nil
}
In the case where AutoCommit.Enable == true
, this hang doesn't happen because of the fact that the POM is released via periodic background calls to Commit()
, causing the errors
channel to be closed here.
FWIW, I've found that the following sequence works when closing a POM/OM, regardless of the AutoCommit.Enable
state:
pom.AsyncClose()
err = om.Close()
... handle err ...
err = pom.Close
... handle err ...
It's debatable whether this runs afoul of the documentation for OffsetManager.Close
, which says:
You must call this after all the PartitionOffsetManagers are closed.
Specifically, does 'have had AsyncClose
called against them' meet the criteria of 'closed' in this statement? (And further, what is the concrete reason that the POMs need to be closed first?)
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.