confluent-kafka-go
confluent-kafka-go copied to clipboard
How do you cancel the poll when a SIGINT or SIGTERM is sent and the poll timeout is -1
The following code indefinitely hangs when I press CTRL+C. Also won’t this cause the consumer to get kicked out? As the interval between poll calls will be more than max.poll.interval.ms if there are no messages being produced
// ctx is just a context.Background
for {
select {
case <-ctx.Done():
log.Printf("[%s]: received context done", consumer.String())
return messages, ctx.Err()
default:
e := consumer.Poll(-1)
switch event := e.(type) {
case kafka.Error:
if event.IsFatal() {
panic(event)
}
// log err
case *kafka.Message:
// launch goroutines to process the messages
}
default:
// track offsets of the goroutines
}
}
CTRL+C doesn't seem to exit the Poll method as poll is waiting for some message to arrive
however it does work fine with values other than -1
, which understandable as the poll times out. Can the poll method take a context so that it can cancel itself even when a value of -1
.One of the ways I thought of was to listen to the context.Done()
channel in another goroutine and invoke the consumer.Close()
like so
go func() {
<-ctx.Done()
consumer.Close()
}()
CTRL+C doesn't seem to exit the Poll method as poll is waiting for some message to arrive
that's a known issue, i believe it's not entirely straightforward to fix in librdkafka. in the .net client, this is implemented like so: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Consumer.cs#L945 - you could do something similar.
Also won’t this cause the consumer to get kicked out? As the interval between poll calls will be more than max.poll.interval.ms if there are no messages being produced
no: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_int.h#L984
I understand but for something like the .net client I cannot implement it in my application logic, what are your thoughts on closing the consumer in another goroutine upon receiving the cancellation signal. I know the consumer shouldn’t be shared accross goroutines but this seems like an easier way as the consumer thread is indefinitely hung up so there can’t be possibilities of data races. Correct me if I am wrong
unfortunately you can't call close when poll is blocking
you'll need to implement a loop that calls poll with a short timeout, and breaks out of the loop on ctrl-c.
Yes we usually use 100 - 500 ms as the timeout, but we had one use case where we wanted to block forever. So how do we gracefully shutdown the application in case of Poll timeout being -1. All I can think of is to trigger a panic and recover it
repeatedly call poll with a shorter timeout, for the same effect as using an infinite timeout.
Thank you very much !