confluent-kafka-go
confluent-kafka-go copied to clipboard
Closing consumer takes 10 second
Description
I am trying to just open and close consumer. Closing takes 10 seconds.
I found on consumer code:
// Poll for rebalance events
for {
c.Poll(10 * 1000)
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
break
}
}
Are there any parameters to force consumer fast close? Please advise;
How to reproduce
func main() {
cfg := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "group",
"broker.address.family": "v4",
}
consumer, err := kafka.NewConsumer(cfg)
if err != nil {
panic(err)
} else {
fmt.Println("consumer opened")
}
startClose := time.Now()
err = consumer.Close()
if err != nil {
panic(err)
}
fmt.Println(fmt.Sprintf("consumer closed. closing time: %v", time.Since(startClose)))
}
output: consumer closed. closing time: 10.005907343s
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version (
1.7.0
): - [x] Apache Kafka broker version:
2.7.0
- [x] Client configuration:
&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "group","broker.address.family": "v4",}
- [x] Operating system: macOs
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
@jliunyu Can you take a look at this? Haven't checked the code, but make sure Close() isn't waiting for a rebalance event if application.rebalance is not enabled.
Hi @edenhill, after checking the code, found that Close() is always waiting for rebalance event with the following code: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L424
I will take a look how to fix it and provide a fix soon.
same problem
I've seen this too. Consumer always takes at least the value of x
in Poll(x)
. I.e. if that is 10s (like the above example) then consumer.Close()
takes minimum 10s. Reduce it to 5s, Close()
takes 5s.
Hello, this issue seems frozen.
I had to downgrade to v1.5.2 to overcome this issue until further update.
@edenhill, @jliunyu I have a question the code pointer that causes the delay: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L438 Why is the processing of the rebalance events (assign/revoke) important when closing a consumer? Is this a requirement of Incremental Cooperative rebalance? What if processing of such event is skipped and consumer is closed without this for loop (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L437-L442)?
Additionally, this issue seems relevant to another issue: https://github.com/confluentinc/confluent-kafka-go/issues/610 If rebalance events are not triggered, then the expected event from the poll performing in the close (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L438) will never trigger, until the timeout is reached.
confluent-kafka-go version: 1.7.0 Apache Kafka broker version: 2.7.0
I would be happy to help on the resolution of this issue.
@jliunyu sorry I didn't see this issue before posting https://github.com/confluentinc/confluent-kafka-go/issues/728
Is any fix/improvement planned?
Thank you,
Same issue here. Even with poll set to 100 it takes > 5-10 seconds to Close the consumer for me
The fix for this is planned for the next release
@jliunyu any update on this? When's the next release due?
This is the PR for the fix: https://github.com/confluentinc/confluent-kafka-go/pull/757
Closing this as the PR above is merged.