Ling Jin

Results 249 comments of Ling Jin

expected feature, when try to close the `asyncProducer`, just drop all buffered message, and response immediately.

```goroutine 304 [chan receive, 12 minutes]: github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x1400012c310) github.com/Shopify/[email protected]/async_producer.go:330 +0x8c github.com/Shopify/sarama.withRecover(0x140019c8360) github.com/Shopify/[email protected]/utils.go:43 +0x44 created by github.com/Shopify/sarama.newAsyncProducer github.com/Shopify/[email protected]/async_producer.go:166 +0x270 goroutine 404 [chan receive, 2 minutes]: github.com/Shopify/sarama.(*asyncProducer).Close(0x1400012c310) github.com/Shopify/[email protected]/async_producer.go:307 +0x154 github.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).Close(0x14000e86dc0) github.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:217 +0x11c github.com/pingcap/tiflow/cdc/sink.(*mqSink).Close(0x140008a4600,...

Our service need `Success` and `Errors` enabled, since we have to track message flushed offset and errors.

I added some logs to customize the sarama, and there is some of the output: ``` [2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=1"] [name=sarama] [2022/02/07...

How about add a `timeout` for the close, like https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close-- or a method like `ForceClose()`