sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Wait for broker connections to close during shutdown

Open gebn opened this issue 3 years ago • 4 comments

As of the latest release v1.31.1, Client.Close() closes broker connections asynchronously, returning without waiting for this to complete. What is the best way to close the client cleanly?

gebn avatar Feb 23 '22 19:02 gebn

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 18 '23 08:08 github-actions[bot]

I agree that the goroutine lifecycle management in Sarama could do with an overhaul. As you highlight here, these async close goroutines shouldn't really be fire-and-forget and the parent Close should provide a mechanism for waiting on their completion

dnwe avatar Aug 18 '23 09:08 dnwe

Had similar issue and had to find a workaround to fix flaky tests that were failing because of the async close. Here is my current solution:

client, err := sarama.NewClient(...)
if err != nil {
	return nil, fmt.Errorf("could not build kafka producer client: %w", err)
}

producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
	return nil, fmt.Errorf("could not build kafka producer: %w", err)
}

// ... LOGIC THAT USES THE PRODUCER

// Producer is closing its underlying client in the async way that may lead to a situation when the app/test
// is already finished, but the network connections are not really closed yet. In order to ensure we
// really closed everything and avoid situations like "panic: Log in goroutine after TestXXX has completed"
// let's take care of the closing all the layers in the manual way.

// interface description tells: You must call this before calling Close on the underlying client.
if err := producer.Close(); err != nil {
	return fmt.Errorf("could not properly close kafka producer: %w", err)
}

// client is closing its brokers in the async way as well, so let's handle this manually and synchronously as well
for _, b := range client.Brokers() {
	// this is fine if the broker is already closed - it means producer's client already closed it
	if err := b.Close(); err != nil && !errors.Is(err, sarama.ErrNotConnected) {
		return fmt.Errorf("could not properly close kafka producer underlying broker: %w", err)
	}
}

if err := client.Close(); err != nil && !errors.Is(err, sarama.ErrClosedClient) {
	// this is fine if the client is already closed - it means producer already closed it
	return fmt.Errorf("could not properly close kafka producer underlying client: %w", err)
}

UPD it did not really help 😞

vgarvardt avatar Oct 27 '23 16:10 vgarvardt

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Feb 01 '24 18:02 github-actions[bot]