kafka-go
kafka-go copied to clipboard
FetchMessage returns io.EOF when reader not closed
The comments of FetchMessage state that when io.EOF is returned, it means that the reader has been closed.
However, I encountered a scenario where the reader did not closed and returned io.EOF.
described as follows:
I test to exit kafka server gracefully. Will trigger rebalance .
If I stop the consumer group leader, such as 127.0.0.1:9092,The coordinator function will try to call findCoordinator to find the coordinator,I added the log。
func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// NOTE : could try to cache the coordinator to avoid the double connect
// here. since consumer group balances happen infrequently and are
// an expensive operation, we're not currently optimizing that case
// in order to keep the code simpler.
conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
if err != nil {
fmt.Println("------coordinator conn----------------", conn, err)
return nil, err
}
tt, _ := conn.(*timeoutCoordinator)
defer conn.Close()
fmt.Println("----------------------coordinator---------------------------------", tt.conn.conn.RemoteAddr().String(), tt.conn.conn.LocalAddr().String())
out, err := conn.findCoordinator(findCoordinatorRequestV0{
CoordinatorKey: cg.config.ID,
})
if err == nil && out.ErrorCode != 0 {
err = Error(out.ErrorCode)
}
if err != nil {
fmt.Println("------coordinator findCoordinator----------------", err)
return nil, err
}
address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
return cg.config.connect(cg.config.Dialer, address)
}
The output log is as follows:
1. first
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50589
------coordinator findCoordinator---------------- EOF
2. second
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50600
------coordinator findCoordinator---------------- EOF
3. third
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50612
------coordinator findCoordinator---------------- EOF
The connection was established three times, but because the kafka server had stopped at this time, the connection was closed, and an EOF error was returned three times.
Eventually the error was exposed to the business layer. But at this time, the reader did not close. So can we use other flag to indicate reader close?