AsyncProducer `Close` blocked for infinity
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
| Sarama | Kafka | Go |
|---|---|---|
| 1.27.2 | 3.0.0 | 1.7.5 |
Configuration
What configuration values are you using for Sarama and Kafka?
config.Metadata.Retry.Max = 3
config.Metadata.Retry.Backoff = 250 * time.Millisecond
config.Metadata.Timeout = 1 * time.Minute
// Admin.Retry take effect on `ClusterAdmin` related operations,
// only `CreateTopic` for cdc now. Just use default values.
config.Admin.Retry.Max = 5
config.Admin.Retry.Backoff = 100 * time.Millisecond
config.Admin.Timeout = 3 * time.Second
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 100 * time.Millisecond
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
Logs
We are testing sarama in an extremely rare scenario:
- producer can send request to the 1 machine Kafka cluster, but cannot get a response
Kill -s STOPthe kafka broker process, this will make the TCP connection remain, but will not send a response.- Close the producer by call
asyncProducer.Close()
we do not have a log from sarama, but have grabbed some goroutine stack, like the following one, it looks blocked on trying to receive a response from the broker.

Our purpose is that when try to close the producer, it should not be blocked for a long time, instead of return as soon as possible.

But the reality as shown in the picture above, 33 messages failed to deliver after 38minutes, and it was after the process resume by kill -s CONT
Problem Description
expected feature, when try to close the asyncProducer, just drop all buffered message, and response immediately.
We are facing an issue along similar lines with 1.31.0, though we haven't seen it in any prior version.
| Sarama | Kafka | Go |
|---|---|---|
| 1.31.0 | 2.3.1 | 1.17.6 |
config := sarama.NewConfig()
config.Producer.Compression = sarama.CompressionSnappy
config.ChannelBufferSize = 32
We are not directly calling asyncProducer.Close(), but the producer is Closing itself as part of error handling - leading to the infinite block. I am working to get more specifics and an example.
WIP details:
Close() - https://github.com/Shopify/sarama/blob/v1.31.0/broker.go#L269
which is holding the b.lock, waiting for channel b.done
Close() is called when an error response is returned here - https://github.com/Shopify/sarama/blob/v1.31.0/async_producer.go#L1022-L1031
b.done is closed when responseReceiver is no longer processing an event
At least 1 goroutine is waiting to handle a dead error, rather than on the select (haven't pinpointed exactly what its waiting for, in relation to processing the dead error)
Potentially a race condition between Close() and the handling of the dead error that initiated the close? TBD
97 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6df3f0 0x6df9a8 0x6dd394 0x6ea68c 0x6e7504 0x6d6fcc 0x5f9964 0x5f9864 0x6d6f0c 0x6d6ead 0x6d6198 0x720d34 0x7f2d4
# 0x7b5bf sync.runtime_SemacquireMutex+0x3f /usr/local/go/src/runtime/sema.go:71
# 0x88b43 sync.(*Mutex).lockSlow+0x193 /usr/local/go/src/sync/mutex.go:138
# 0x6df4e3 sync.(*Mutex).Lock+0xa3 /usr/local/go/src/sync/mutex.go:81
# 0x6df474 github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
# 0x6df3ef github.com/Shopify/sarama.(*Broker).send+0xdf /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:876
# 0x6df9a7 github.com/Shopify/sarama.(*Broker).sendAndReceive+0x77 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:934
# 0x6dd393 github.com/Shopify/sarama.(*Broker).GetMetadata+0x63 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:316
# 0x6ea68b github.com/Shopify/sarama.(*client).tryRefreshMetadata+0x46b /go/pkg/mod/github.com/!shopify/[email protected]/client.go:895
# 0x6e7503 github.com/Shopify/sarama.(*client).RefreshMetadata+0x103 /go/pkg/mod/github.com/!shopify/[email protected]/client.go:489
# 0x6d6fcb github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1+0x8b /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:657
# 0x5f9963 github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1+0x53 /go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:85
# 0x5f9863 github.com/eapache/go-resiliency/breaker.(*Breaker).doWork+0x33 /go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:86
# 0x6d6f0b github.com/eapache/go-resiliency/breaker.(*Breaker).Run+0x7b /go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:55
# 0x6d6eac github.com/Shopify/sarama.(*partitionProducer).updateLeader+0x1c /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:656
# 0x6d6197 github.com/Shopify/sarama.(*partitionProducer).dispatch+0x537 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:589
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
4 @ 0x4bf0c 0x17f94 0x17978 0x6d5df0 0x720d34 0x7f2d4
# 0x6d5def github.com/Shopify/sarama.(*partitionProducer).dispatch+0x18f /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:546
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
4 @ 0x4bf0c 0x17f94 0x17978 0x6d7538 0x720d34 0x7f2d4
# 0x6d7537 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0x57 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:695
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
4 @ 0x4bf0c 0x5de98 0x6d798c 0x720d34 0x7f2d4
# 0x6d798b github.com/Shopify/sarama.(*brokerProducer).run+0x17b /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:765
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
2 @ 0x4bf0c 0x16fc4 0x16b10 0x6d63b8 0x720d34 0x7f2d4
# 0x6d63b7 github.com/Shopify/sarama.(*partitionProducer).dispatch+0x757 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:606
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
2 @ 0x4bf0c 0x17f94 0x17938 0x6da1bc 0x720d34 0x7f2d4
# 0x6da1bb github.com/Shopify/sarama.(*asyncProducer).retryHandler+0x14b /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1052
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
2 @ 0x4bf0c 0x17f94 0x17978 0x6d4d40 0x720d34 0x7f2d4
# 0x6d4d3f github.com/Shopify/sarama.(*asyncProducer).dispatcher+0x8f /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:331
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
2 @ 0x4bf0c 0x17f94 0x17978 0x6d5494 0x720d34 0x7f2d4
# 0x6d5493 github.com/Shopify/sarama.(*topicProducer).dispatch+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:413
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x16fc4 0x16b10 0x6d77ec 0x6dd84c 0x6db894 0x6e0078 0x720d34 0x7f2d4
# 0x6d77eb github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.2+0xcb /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:701
# 0x6dd84b github.com/Shopify/sarama.(*Broker).AsyncProduce.func1+0xfb /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:388
# 0x6db893 github.com/Shopify/sarama.(*responsePromise).handle+0xc3 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:132
# 0x6e0077 github.com/Shopify/sarama.(*Broker).responseReceiver+0xb7 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1020
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x17f94 0x17938 0x6dcfd8 0x6d9e44 0x6d8988 0x6d79c0 0x720d34 0x7f2d4
# 0x6dcfd7 github.com/Shopify/sarama.(*Broker).Close+0xe7 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:269
# 0x6d9e43 github.com/Shopify/sarama.(*brokerProducer).handleError+0x1a3 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1031
# 0x6d8987 github.com/Shopify/sarama.(*brokerProducer).handleResponse+0x47 /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:898
# 0x6d79bf github.com/Shopify/sarama.(*brokerProducer).run+0x1af /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:831
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x17f94 0x17978 0x6e0024 0x720d34 0x7f2d4
# 0x6e0023 github.com/Shopify/sarama.(*Broker).responseReceiver+0x63 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1015
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x5de98 0x6e9ff0 0x720d34 0x7f2d4
# 0x6e9fef github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0xff /go/pkg/mod/github.com/!shopify/[email protected]/client.go:824
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6dd724 0x6d75d0 0x720d34 0x7f2d4
# 0x7b5bf sync.runtime_SemacquireMutex+0x3f /usr/local/go/src/runtime/sema.go:71
# 0x88b43 sync.(*Mutex).lockSlow+0x193 /usr/local/go/src/sync/mutex.go:138
# 0x6df4e3 sync.(*Mutex).Lock+0xa3 /usr/local/go/src/sync/mutex.go:81
# 0x6df474 github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
# 0x6dd723 github.com/Shopify/sarama.(*Broker).AsyncProduce+0x133 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:405
# 0x6d75cf github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0xef /go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:712
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6df3f0 0x6df9a8 0x6dd394 0x6ea68c 0x6e7504 0x6ea19c 0x6ea00c 0x720d34 0x7f2d4
# 0x7b5bf sync.runtime_SemacquireMutex+0x3f /usr/local/go/src/runtime/sema.go:71
# 0x88b43 sync.(*Mutex).lockSlow+0x193 /usr/local/go/src/sync/mutex.go:138
# 0x6df4e3 sync.(*Mutex).Lock+0xa3 /usr/local/go/src/sync/mutex.go:81
# 0x6df474 github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
# 0x6df3ef github.com/Shopify/sarama.(*Broker).send+0xdf /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:876
# 0x6df9a7 github.com/Shopify/sarama.(*Broker).sendAndReceive+0x77 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:934
# 0x6dd393 github.com/Shopify/sarama.(*Broker).GetMetadata+0x63 /go/pkg/mod/github.com/!shopify/[email protected]/broker.go:316
# 0x6ea68b github.com/Shopify/sarama.(*client).tryRefreshMetadata+0x46b /go/pkg/mod/github.com/!shopify/[email protected]/client.go:895
# 0x6e7503 github.com/Shopify/sarama.(*client).RefreshMetadata+0x103 /go/pkg/mod/github.com/!shopify/[email protected]/client.go:489
# 0x6ea19b github.com/Shopify/sarama.(*client).refreshMetadata+0x7b /go/pkg/mod/github.com/!shopify/[email protected]/client.go:848
# 0x6ea00b github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0x11b /go/pkg/mod/github.com/!shopify/[email protected]/client.go:826
# 0x720d33 github.com/Shopify/sarama.withRecover+0x43 /go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x1400012c310)
github.com/Shopify/[email protected]/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x140019c8360)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
github.com/Shopify/[email protected]/async_producer.go:166 +0x270
goroutine 404 [chan receive, 2 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close(0x1400012c310)
github.com/Shopify/[email protected]/async_producer.go:307 +0x154
github.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).Close(0x14000e86dc0)
github.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:217 +0x11c
github.com/pingcap/tiflow/cdc/sink.(*mqSink).Close(0x140008a4600, {0x103c77048, 0x14005410d00})
github.com/pingcap/tiflow/cdc/sink/mq.go:309 +0x34
github.com/pingcap/tiflow/cdc/sink.(*Manager).Close(0x140027a7800, {0x103c77048, 0x14005410d00})
github.com/pingcap/tiflow/cdc/sink/manager.go:92 +0x278
github.com/pingcap/tiflow/cdc/processor.(*processor).Close(0x140023ce000)
github.com/pingcap/tiflow/cdc/processor/processor.go:1068 +0x774
github.com/pingcap/tiflow/cdc/processor.(*Manager).closeProcessor(0x14000f5e020, {0x14000fda2da, 0x24})
github.com/pingcap/tiflow/cdc/processor/manager.go:132 +0x68
github.com/pingcap/tiflow/cdc/processor.(*Manager).Tick(0x14000f5e020, {0x12ec19bb0, 0x14000ec0e80}, {0x103c31300, 0x14000a78c80})
github.com/pingcap/tiflow/cdc/processor/manager.go:112 +0x540
github.com/pingcap/tiflow/pkg/orchestrator.(*EtcdWorker).Run(0x140009fbc00, {0x12ec19bb0, 0x14000ec0e80}, 0x14000effcb0, 0x5f5e100, {0x14001b30020, 0xe}, {0x102db3ae6, 0x9})
github.com/pingcap/tiflow/pkg/orchestrator/etcd_worker.go:239 +0xa04
github.com/pingcap/tiflow/cdc/capture.(*Capture).runEtcdWorker(0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, {0x103c17400, 0x14000f5e020}, {0x103c31300, 0x14000a78c80}, 0x5f5e100, {0x102db3ae6, 0x9})
github.com/pingcap/tiflow/cdc/capture/capture.go:456 +0x120
github.com/pingcap/tiflow/cdc/capture.(*Capture).run.func3(0x14000994210, 0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, 0x140006e61c0)
github.com/pingcap/tiflow/cdc/capture/capture.go:316 +0x2d4
created by github.com/pingcap/tiflow/cdc/capture.(*Capture).run
github.com/pingcap/tiflow/cdc/capture/capture.go:296 +0x478
goroutine 1785 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close.func1()
github.com/Shopify/[email protected]/async_producer.go:300 +0x48
github.com/Shopify/sarama.withRecover(0x140073d4ab0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).Close
github.com/Shopify/[email protected]/async_producer.go:299 +0x9c
goroutine 929 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x1400012c310)
github.com/Shopify/[email protected]/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x140019c8370)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
github.com/Shopify/[email protected]/async_producer.go:167 +0x2d4
goroutine 670 [chan receive]:
github.com/rcrowley/go-metrics.(*meterArbiter).tick(0x10679ea20)
github.com/rcrowley/[email protected]/meter.go:239 +0x34
created by github.com/rcrowley/go-metrics.NewMeter
github.com/rcrowley/[email protected]/meter.go:46 +0xe4
goroutine 303 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a090)
github.com/Shopify/[email protected]/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x140019c8340)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
github.com/Shopify/[email protected]/client.go:180 +0x3f8
goroutine 1784 [semacquire, 12 minutes]:
sync.runtime_Semacquire(0x1400012c350)
runtime/sema.go:56 +0x38
sync.(*WaitGroup).Wait(0x1400012c348)
sync/waitgroup.go:130 +0xa4
github.com/Shopify/sarama.(*asyncProducer).shutdown(0x1400012c310)
github.com/Shopify/[email protected]/async_producer.go:1059 +0xd8
github.com/Shopify/sarama.withRecover(0x140073d4aa0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).AsyncClose
github.com/Shopify/[email protected]/async_producer.go:321 +0x80
goroutine 948 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleSuccesses(0x140026ae4c8)
github.com/Shopify/[email protected]/sync_producer.go:131 +0x94
github.com/Shopify/sarama.withRecover(0x14001b871c0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
github.com/Shopify/[email protected]/sync_producer.go:76 +0xd0
goroutine 947 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x14000555500)
github.com/Shopify/[email protected]/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x14001b871b0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
github.com/Shopify/[email protected]/async_producer.go:167 +0x2d4
goroutine 946 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x14000555500)
github.com/Shopify/[email protected]/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x14001b871a0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
github.com/Shopify/[email protected]/async_producer.go:166 +0x270
goroutine 945 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a2d0)
github.com/Shopify/[email protected]/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x14001b87180)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
github.com/Shopify/[email protected]/client.go:180 +0x3f8
goroutine 949 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleErrors(0x140026ae4c8)
github.com/Shopify/[email protected]/sync_producer.go:139 +0x98
github.com/Shopify/sarama.withRecover(0x14001b871d0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
github.com/Shopify/[email protected]/sync_producer.go:77 +0x134
goroutine 1125 [select, 2 minutes]:
github.com/Shopify/sarama.(*Broker).sendAndReceive(0x14000d5d180, {0x103c8c220, 0x140021afe30}, {0x103c8c268, 0x140059cbd10})
github.com/Shopify/[email protected]/broker.go:774 +0xf4
github.com/Shopify/sarama.(*Broker).GetMetadata(0x14000d5d180, 0x140021afe30)
github.com/Shopify/[email protected]/broker.go:283 +0x64
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x2, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
github.com/Shopify/[email protected]/client.go:880 +0x450
github.com/Shopify/sarama.(*client).tryRefreshMetadata.func2({0x103c15140, 0x1400087ece0})
github.com/Shopify/[email protected]/client.go:859 +0x1f0
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x3, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
github.com/Shopify/[email protected]/client.go:927 +0xac0
github.com/Shopify/sarama.(*client).RefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1})
github.com/Shopify/[email protected]/client.go:473 +0xfc
github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1()
github.com/Shopify/[email protected]/async_producer.go:657 +0x8c
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x14002c39e68, 0x14002c39eb8)
github.com/eapache/[email protected]/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x14002518050, 0x0, 0x14002c39eb8)
github.com/eapache/[email protected]/breaker/breaker.go:86 +0x34
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
github.com/eapache/[email protected]/breaker/breaker.go:55
github.com/Shopify/sarama.(*partitionProducer).updateLeader(0x1400408e060)
github.com/Shopify/[email protected]/async_producer.go:656 +0x74
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x1400408e060)
github.com/Shopify/[email protected]/async_producer.go:589 +0x55c
github.com/Shopify/sarama.withRecover(0x1400557c7c0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
github.com/Shopify/[email protected]/async_producer.go:513 +0x200
goroutine 3458 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea13d18, 0x72)
runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14003396c18, 0x72, 0x0)
internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140027d8c30, {0x140022ed000, 0x1000, 0x1000})
net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400387fa40, {0x14000b08128, 0x8, 0x8})
bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14005305d10, {0x14000b08128, 0x8, 0x8})
github.com/Shopify/[email protected]/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14005305d10}, {0x14000b08128, 0x8, 0x8}, 0x8)
io/io.go:328 +0xa0
io.ReadFull(...)
io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5d180, {0x14000b08128, 0x8, 0x8})
github.com/Shopify/[email protected]/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5d180)
github.com/Shopify/[email protected]/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x14000fac070)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
github.com/Shopify/[email protected]/broker.go:211 +0xbac
goroutine 3277 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea141a0, 0x72)
runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14000742018, 0x72, 0x0)
internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140506829f0, {0x1400248f000, 0x1000, 0x1000})
net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400408f1a0, {0x14002d0daf0, 0x8, 0x8})
bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14006ec6528, {0x14002d0daf0, 0x8, 0x8})
github.com/Shopify/[email protected]/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14006ec6528}, {0x14002d0daf0, 0x8, 0x8}, 0x8)
io/io.go:328 +0xa0
io.ReadFull(...)
io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5c000, {0x14002d0daf0, 0x8, 0x8})
github.com/Shopify/[email protected]/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5c000)
github.com/Shopify/[email protected]/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x140078cd9e0)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
github.com/Shopify/[email protected]/broker.go:211 +0xbac
goroutine 1278 [chan send, 2 minutes]:
github.com/Shopify/sarama.(*topicProducer).dispatch(0x14000d2ea80)
github.com/Shopify/[email protected]/async_producer.go:426 +0x154
github.com/Shopify/sarama.withRecover(0x14039381830)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
github.com/Shopify/[email protected]/async_producer.go:407 +0x1fc
goroutine 1279 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x140025bcd80)
github.com/Shopify/[email protected]/async_producer.go:546 +0x198
github.com/Shopify/sarama.withRecover(0x14039381840)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
github.com/Shopify/[email protected]/async_producer.go:513 +0x200
goroutine 1409 [select, 12 minutes]:
github.com/Shopify/sarama.(*brokerProducer).run(0x14003b530a0)
github.com/Shopify/[email protected]/async_producer.go:747 +0x17c
github.com/Shopify/sarama.withRecover(0x14039381860)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
github.com/Shopify/[email protected]/async_producer.go:691 +0x25c
goroutine 1410 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
github.com/Shopify/[email protected]/async_producer.go:695 +0x68
github.com/Shopify/sarama.withRecover(0x14000e72e00)
github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
github.com/Shopify/[email protected]/async_producer.go:694 +0x2e8
Our service need Success and Errors enabled, since we have to track message flushed offset and errors.
I think I came across the same issue with Sarama 1.31.0 kafka 1.1.0 with golang 1.17.5 And below is my extracted goroutine stack
goroutine profile: total 334
45 @ 0x4383b6 0x40640c 0x405e78 0xd38906 0xd906de 0x468c21
# 0xd38905 github.com/Shopify/sarama.(*partitionProducer).dispatch+0x1a5 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:546
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
12 @ 0x4383b6 0x448052 0xd3a331 0xd906de 0x468c21
# 0xd3a330 github.com/Shopify/sarama.(*brokerProducer).run+0x190 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:765
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
9 @ 0x4383b6 0x40640c 0x405e38 0xd8ecb0 0xd9f542 0xf0f2d1 0xf152a6 0xf152b4 0xf27a05 0x468c21
# 0xd8ecaf github.com/Shopify/sarama.(*syncProducer).SendMessage+0x8f /root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:96
# ..........my project code.......
# ..........my project code.......
# ..........my project code.......
9 @ 0x4383b6 0x40640c 0x405e78 0xd42a14 0xd906de 0x468c21
# 0xd42a13 github.com/Shopify/sarama.(*Broker).responseReceiver+0x73 /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1015
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
3 @ 0x4383b6 0x405565 0x40511d 0xd38e58 0xd906de 0x468c21
# 0xd38e57 github.com/Shopify/sarama.(*partitionProducer).dispatch+0x6f7 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:606
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
3 @ 0x4383b6 0x40640c 0x405e38 0xd3caff 0xd906de 0x468c21
# 0xd3cafe github.com/Shopify/sarama.(*asyncProducer).retryHandler+0x19e /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1052
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
3 @ 0x4383b6 0x40640c 0x405e78 0xd37852 0xd906de 0x468c21
# 0xd37851 github.com/Shopify/sarama.(*asyncProducer).dispatcher+0xd1 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:331
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
3 @ 0x4383b6 0x40640c 0x405e78 0xd8f13c 0xd906de 0x468c21
# 0xd8f13b github.com/Shopify/sarama.(*syncProducer).handleSuccesses+0x9b /root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:130
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
3 @ 0x4383b6 0x40640c 0x405e78 0xd8f2a5 0xd906de 0x468c21
# 0xd8f2a4 github.com/Shopify/sarama.(*syncProducer).handleErrors+0xa4 /root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:138
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4383b6 0x405565 0x40511d 0xd3a105 0xd402c3 0xd3e419 0xd42dae 0xd906de 0x468c21
# 0xd3a104 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.2+0xc4 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:701
# 0xd402c2 github.com/Shopify/sarama.(*Broker).AsyncProduce.func1+0xc2 /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:388
# 0xd3e418 github.com/Shopify/sarama.(*responsePromise).handle+0x98 /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:132
# 0xd42dad github.com/Shopify/sarama.(*Broker).responseReceiver+0x40d /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1020
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4383b6 0x40640c 0x405e38 0xd3faab 0xd3c6c5 0xd3b20e 0xd3a369 0xd906de 0x468c21
# 0xd3faaa github.com/Shopify/sarama.(*Broker).Close+0xca /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:269
# 0xd3c6c4 github.com/Shopify/sarama.(*brokerProducer).handleError+0x184 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1031
# 0xd3b20d github.com/Shopify/sarama.(*brokerProducer).handleResponse+0x2d /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:898
# 0xd3a368 github.com/Shopify/sarama.(*brokerProducer).run+0x1c8 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:831
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4383b6 0x40640c 0x405e78 0xd37fd4 0xd906de 0x468c21
# 0xd37fd3 github.com/Shopify/sarama.(*topicProducer).dispatch+0x53 /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:413
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
1 @ 0x4383b6 0x44914c 0x449126 0x464cc5 0x472b05 0xd41d37 0xd41d12 0xd401cb 0xd39f1c 0xd906de 0x468c21
# 0x464cc4 sync.runtime_SemacquireMutex+0x24 /usr/local/go/src/runtime/sema.go:71
# 0x472b04 sync.(*Mutex).lockSlow+0x164 /usr/local/go/src/sync/mutex.go:138
# 0xd41d36 sync.(*Mutex).Lock+0x96 /usr/local/go/src/sync/mutex.go:81
# 0xd41d11 github.com/Shopify/sarama.(*Broker).sendWithPromise+0x71 /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
# 0xd401ca github.com/Shopify/sarama.(*Broker).AsyncProduce+0x10a /root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:405
# 0xd39f1b github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0xdb /root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:712
# 0xd906dd github.com/Shopify/sarama.withRecover+0x3d /root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43
goroutine A (*Broker).Close+0xca
brokerProducer run() loop
-> handleError
-> bp.broker.Close
-> bp.broker.lock.Locked, close broker's "responses" channel(buffered channel) ,blocked by broker's "done" channel, and asyncProducer's "responses" channel(zero buffered) could never be closed
goroutine B (*Broker).responseReceiver+0x40d
try pushing broker's dead info into asyncProducer's "responses" channel(zero buffered), blocked, and broker's done channel could never be closed, goroutine A will be blocked forever
then other application code's goroutine(producers) will not work since bp.broker's lock will never be released
how about the below fix
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse,1) //buffered channel should work?
)
//.......
//.......
}
I believe there are two issues discussed here with similar outcomes.
Default configuration of the AsyncProducer can lead to long delays
By default the AsyncProducer has retries and backoff enabled that can lead to long delays before failing.
Such long delays are often a combination of multiple records being buffered and the current in flight request being "stuck" reading from a broker.
The default timeout when reading from a TCP socket is 30 seconds (config.Net.ReadTimeout), so it might take up to 30 seconds to notice that the connection to a given broker is broken (assuming the connection was not properly closed on both ends).
Then you might see a 100ms backoff per record (if you have 600 pending records going to a given partition, you might be waiting up to 1 minute) before you can replay all those records.
Also if the target broker is not reachable anymore, you might also hit another 30 seconds delay (config.Net.DialTimeout) before triggering another retry (up to 3 by default).
So depending on how the AsyncProducer is configured and the type of network error, it might look the producer is stuck but it is actually mostly idle because of the retry logic.
See #1359 for yet another example on how it can take up to 4 minutes to fail trying to connect to a 2 brokers cluster.
expected feature, when try to close the asyncProducer, just drop all buffered message, and response immediately.
Unfortunately because of how the pipeline logic works, I don't think the shutdown message (created when closing the AsyncProducer) is handled till all the queued records are processed.
That is existing retries need to finish (which is often what is taking time) but "new" retries will be cancelled.
To fail faster, you could disable retries and handle them yourself:
config.Net.DialTimeout = <short-enough-for-your-need>
config.Net.ReadTimeout = <short-enough-for-your-need>
config.Metadata.Timeout = <short-enough-for-your-need>
config.Producer.Retry.Max = 0
config.Producer.Retry.Backoff = 0
Deadlock on retries (specific Sarama 1.31.0)
The other issue is indeed a deadlock when a brokerProducer is trying to call Close on its broker inside the callback of broker.AsyncProduce, this happens when dealing when receiving failed Produce response while sending concurrently another Produce request.
Such callback is called from the responseReceiver goroutine but the Close receiver blocks:
- if there is an extra
Producerequest that reaches the maximum number of in flights requests (config.Net.MaxOpenRequests) by trying to acquire theb.lock. - till
responseReceivergoroutine is done (by reading fromb.done).
This is a regression from #2094 and I should have a fix with a simple unit test for that soon.
I believe #2129 describes that regression as well and I don't think it is specific to the SyncProducer.
I added some logs to customize the sarama, and there is some of the output:
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=1"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=2"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=3"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=4"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=5"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=6"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=7"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=8"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=9"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=10"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=11"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=12"] [name=sarama]
....
more than 100 messages like this, I think the mount of this log has relations with the mount of messages.
Later happens this kind of messages.
[2022/02/07 11:06:48.427 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.427 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.528 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.528 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.628 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.628 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
....
And finally
[2022/02/07 11:06:48.831 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.831 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:580] ["fin received, try flushRetryBuffers, highWatermark=1"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:632] ["producer/leader/fuck-sarama/0 state change to [flushing-1]"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:654] ["producer/leader/fuck-sarama/0 state change to [normal]"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1065] ["asyncProducer shutdown, inflight wait, elapsed=65.626644333"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1072] ["asyncProducer shutdown, client close elapsed = 4.58e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [ERROR] [kafka.go:236] ["close async client with error"] [error="kafka: Failed to deliver 802 messages."] [duration=1m15.637246541s] [changefeed=747ff99b-56d4-4c22-b20f-d3b515417baf] [role=processor]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [sync_producer.go:167] ["syncProducer close, elapsed = 7.92e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1058] ["Producer shutting down."] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1061] ["asyncProducer send `shutdown`, input size=0"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1065] ["asyncProducer shutdown, inflight wait, elapsed=9.17e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1072] ["asyncProducer shutdown, client close elapsed = 4.1e-08"] [name=sarama]
I think the steps like this:
- send a message to brokerProducer, but failed, so the brokerProducer will be closed.
- try to updateLeader, by using the
breaker updateLeaderwill not succeed, since the broker is already dead. For each message in the retryState, it will try to do that, and finally cost a lot of time.- receive a
finmessage, finally closed.
How about add a timeout for the close, like https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close--
or a method like ForceClose()
So depending on how the AsyncProducer is configured and the type of network error, it might look the producer is stuck but it is actually mostly idle because of the retry logic.
From my observation, it's trying to refreshMetadata from dead brokers.
We've seen this deadlock repeatedly on several Kafka clusters as soon as we picked up v1.31.x, and we haven't seen it since we rolled out back our vendored deps to v1.30.1, so there is definitely a bug in the code, it's not a problem due to a dead broker (our brokers are fine).
FWIW this is affecting us too, an upgrade to 1.31 caused our producers to lock up. Downgrading to 1.30.1 resolved the issue
This seems like a dupe of https://github.com/Shopify/sarama/issues/2129, which is fixed in the latest main: https://github.com/Shopify/sarama/pull/2133
It still exists in 1.32 (which includes fix #2133). Downgrading to 1.30.1 resolved the issue. Thanks, @tsuna
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
We believe this issue is fixed in recent versions of Sarama. Feel free to re-open if you're still seeing a problem on the latest
@dnwe, we faced the same issue with version 1.37, should I update the latest sarama version (My kafka broker version is 3.6.1)?
And I reviewed the comments by @slaunay
Unfortunately because of how the pipeline logic works, I don't think the shutdown message (created when closing the AsyncProducer) is handled till all the queued records are processed. That is existing retries need to finish (which is often what is taking time) but "new" retries will be cancelled. To fail faster, you could disable retries and handle them yourself:
config.Net.DialTimeout = <short-enough-for-your-need> config.Net.ReadTimeout = <short-enough-for-your-need> config.Metadata.Timeout = <short-enough-for-your-need> config.Producer.Retry.Max = 0 config.Producer.Retry.Backoff = 0
If I set the configuration like above, will the issue be solved?
Another configuration, Metadata.Timeout. If I set it to a small value like 1 second, I'm not sure if it will cause other issues.
Could you give me some advice on this?