kafka-go
kafka-go copied to clipboard
ReadMessage can not exit after calling Close
Describe the bug When I consumming the messages from a kafka stream, I try to close the reader like this:
// reader := kafka.NewReader(*readConfig)
reader.Close()
I had tried about 30 times, it works well. But one time I found the method ReadMessage which called in another goroutine can not exit. I call the ReadMessage like this:
go func() {
...
m, err := reader.ReadMessage(ctx)
...
}()
Unless I passed a context with cancel to this method and call the cancel function. the ReadMessage method have exited after read.Close()
Kafka Version 2.8.1
To Reproduce Steps to reproduce the behavior. Bonus points for a code sample.
Expected behavior ReadMessage can not exit after called Close
Additional context I am using the consumer within a consumerGroup.
Maybe the problem is cause by the Close method:
// Close closes the stream, preventing the program from reading any more
// messages from it.
func (r *Reader) Close() error {
atomic.StoreUint32(&r.once, 1)
r.mutex.Lock()
closed := r.closed
r.closed = true
r.mutex.Unlock()
r.cancel()
r.stop()
r.join.Wait()
if r.done != nil {
<-r.done
}
if !closed {
close(r.msgs)
}
return nil
}
I suspect that it have not closed the r.msgs channel in some cases.
Hello @ivloli, thanks for reporting the issue!
Would you be available to submit of fix for the problem you reported?
I have make a pr https://github.com/segmentio/kafka-go/pull/888 but it Failed caused by some checks, and it due to some [no test files] errors
👍 on the issue
Not sure this works either... We just tried using this and the readers still continue to run long after close. Nice to see some people are looking into this other than just us.
Hello @achille-roussel, is there any update on this issue. We are also facing this issue. We have noticed new messages are read after the reader is closed.
We though may be these are the queued messages, the we tried QueueCapacity=0 still is same.
Would one of you be able to provide a capture of stack traces when the issue occurs?
This was so long ago and unfortunately, the microservice we are running right now runs until it finished the work it needs to do, then exits. It restarts later to do more work, so the problem is worked around for us. It seems many systems we've encountered do not take the continuously running service in mind that shuts down these systems as part of a regular process. Unfortunately, this leads to cpu waste which unfortunately affects us all.
I think the biggest thing missing in systems is a general shutdown function that reliably shuts down the processes. We'd like it to shutdown so the process can properly sleep and wake up some time later.
Creating a repro is kind of hard these days as I have moved on to other things and we have a workaround that effectively shuts down the kafka readers...