sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Hang on PartitionOffsetManager.Close sequence with AutoCommit.Enable = false

Open benweint opened this issue 2 years ago • 2 comments

Calls to PartitionOffsetManager.Close seem to consistently hang for me when Consumer.Offsets.AutoCommit.Enable==false. They do not hang when the default value of true is used.

Versions
Sarama Kafka Go
1.35.0 3.2.0 1.17
Configuration
config := sarama.NewConfig()
config.ClientID = "sarama-close-repro"
config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.User = os.Getenv("KAFKA_USER")
config.Net.SASL.Password = os.Getenv("KAFKA_PASSWORD")
config.Consumer.Offsets.AutoCommit.Enable = false // <-- this is critical to reproduce the issue

client, err := sarama.NewClient(brokers, config)
if err != nil {
	panic(err)
}

om, err := sarama.NewOffsetManagerFromClient("sarama-close-repro", client)
if err != nil {
	panic(err)
}

pom, err := om.ManagePartition(topic, 0)
if err != nil {
	panic(err)
}

pom.Close() // <-- hangs here when AutoCommit.Enable = false
om.Close()
client.Close()
Logs

When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

logs: CLICK ME

[sarama] 2022/07/28 18:44:33 Initializing new client
[sarama] 2022/07/28 18:44:33 client/metadata fetching metadata for all topics from broker <cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 Completed pre-auth SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]
[sarama] 2022/07/28 18:44:34 SASL authentication successful with broker <cluster_domain>:9092:4 - [0 0 0 0]
[sarama] 2022/07/28 18:44:34 Connected to broker at <cluster_domain>:9092 (unregistered)
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #0 at b0-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #5 at b5-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #1 at b1-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #4 at b4-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #2 at b2-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 client/brokers registered new broker #3 at b3-<cluster_domain>:9092
[sarama] 2022/07/28 18:44:34 Successfully initialized new client
[sarama] 2022/07/28 18:44:34 client/coordinator requesting coordinator for consumergroup sarama-close-repro from <cluster_domain>:9092
[sarama] 2022/07/28 18:44:35 client/coordinator coordinator for consumergroup sarama-close-repro is #3 (b3-<cluster_domain>:9092)
[sarama] 2022/07/28 18:44:35 Completed pre-auth SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]
[sarama] 2022/07/28 18:44:35 SASL authentication successful with broker b3-<cluster_domain>:9092:4 - [0 0 0 0]
[sarama] 2022/07/28 18:44:35 Connected to broker at b3-<cluster_domain>:9092 (registered as #3)
Problem Description

I'm not sure if this is a problem with the sequencing of calls that I'm making / my usage of the API, or a bug in Sarama, but I don't seem to be able to successfully close a PartitionOffsetManager what was created with AutoCommit.Enable == false.

If this is not a known issue / issue with the API usage, I'd be happy to investigate further and try to write a patch.

Note that swapping the ordering of the PartitionOffsetManager.Close and OffsetManager.Close calls avoids the hang, however the docs for OffsetManager say:

You must call this after all the PartitionOffsetManagers are closed.

Is the documentation just incorrect, or does the correct call ordering dependent on the value of Offsets.AutoCommit.Enable?

benweint avatar Jul 29 '22 01:07 benweint

Tracing this down a bit more in the Sarama code, it looks like the hang happens here in PartitionOffsetManager.Close:

func (pom *partitionOffsetManager) Close() error {
	pom.AsyncClose()

	var errors ConsumerErrors
	for err := range pom.errors { // <---- hang happens here
		errors = append(errors, err)
	}

	if len(errors) > 0 {
		return errors
	}
	return nil
}

In the case where AutoCommit.Enable == true, this hang doesn't happen because of the fact that the POM is released via periodic background calls to Commit(), causing the errors channel to be closed here.

benweint avatar Aug 16 '22 15:08 benweint

FWIW, I've found that the following sequence works when closing a POM/OM, regardless of the AutoCommit.Enable state:

pom.AsyncClose()

err = om.Close()
... handle err ...

err = pom.Close
... handle err ...

It's debatable whether this runs afoul of the documentation for OffsetManager.Close, which says:

You must call this after all the PartitionOffsetManagers are closed.

Specifically, does 'have had AsyncClose called against them' meet the criteria of 'closed' in this statement? (And further, what is the concrete reason that the POMs need to be closed first?)

benweint avatar Aug 22 '22 14:08 benweint

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Nov 15 '23 14:11 github-actions[bot]