Wait for broker connections to close during shutdown
As of the latest release v1.31.1, Client.Close() closes broker connections asynchronously, returning without waiting for this to complete. What is the best way to close the client cleanly?
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
I agree that the goroutine lifecycle management in Sarama could do with an overhaul. As you highlight here, these async close goroutines shouldn't really be fire-and-forget and the parent Close should provide a mechanism for waiting on their completion
Had similar issue and had to find a workaround to fix flaky tests that were failing because of the async close. Here is my current solution:
client, err := sarama.NewClient(...)
if err != nil {
return nil, fmt.Errorf("could not build kafka producer client: %w", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil, fmt.Errorf("could not build kafka producer: %w", err)
}
// ... LOGIC THAT USES THE PRODUCER
// Producer is closing its underlying client in the async way that may lead to a situation when the app/test
// is already finished, but the network connections are not really closed yet. In order to ensure we
// really closed everything and avoid situations like "panic: Log in goroutine after TestXXX has completed"
// let's take care of the closing all the layers in the manual way.
// interface description tells: You must call this before calling Close on the underlying client.
if err := producer.Close(); err != nil {
return fmt.Errorf("could not properly close kafka producer: %w", err)
}
// client is closing its brokers in the async way as well, so let's handle this manually and synchronously as well
for _, b := range client.Brokers() {
// this is fine if the broker is already closed - it means producer's client already closed it
if err := b.Close(); err != nil && !errors.Is(err, sarama.ErrNotConnected) {
return fmt.Errorf("could not properly close kafka producer underlying broker: %w", err)
}
}
if err := client.Close(); err != nil && !errors.Is(err, sarama.ErrClosedClient) {
// this is fine if the client is already closed - it means producer already closed it
return fmt.Errorf("could not properly close kafka producer underlying client: %w", err)
}
UPD it did not really help 😞
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.