kafka-go
kafka-go copied to clipboard
[problem] conn.ReadMessage return EOF error after 10 mins no messages to consumer
Describe the bug
conn.ReadMessage
return EOF error after 10 mins no messages to consumer
Kafka Version
KAFKA_VERSION: 2.7.0
kafka-go version: v0.4.15
To Reproduce
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
topic := "job"
partition := 1
ctx := context.TODO()
address := "127.0.0.1:9092"
// func DialLeader(
// ctx context.Context,
// network string,
// address string,
// topic string,
// partition int)
// (*Conn, error)
conn, err := kafka.DialLeader(
ctx, "tcp", address, topic, partition)
if err != nil {
log.Printf("conn err: %v\n", err)
return
}
defer conn.Close()
//_ = conn.SetReadDeadline(time.Time{}) // zero value
//time.Sleep(3 * time.Second)
first, err := conn.ReadFirstOffset()
if err != nil {
log.Printf("read first offset err: %v\n", err)
return
}
log.Printf("first offset is %v\n", first)
last, err := conn.ReadLastOffset()
if err != nil {
log.Printf("read last offset err: %v\n", err)
return
}
log.Printf("last offset is %v\n", last)
newOffset, err := conn.Seek(last, kafka.SeekAbsolute) // seek to lastOffset
if err != nil {
log.Printf("seek err: %v\n", err)
return
}
log.Printf("newOffset is %v\n", newOffset)
for {
message, err := conn.ReadMessage(1e6) // blocked
if err != nil {
log.Printf("read message err: %v\n", err)
return
}
fmt.Printf("message Value: %s, Offset: %d, Time: %v\n", message.Value, message.Offset, message.Time)
time.Sleep(1 * time.Second)
}
}
after 10mins no messages to consumer, conn.ReadMessage return unexcepted EOF error, see below
Expected behavior
Additional context
i do not know what happen, can you help me? thanks for your reply.
i come with the same problem, how did you do finally?