kafka-go
kafka-go copied to clipboard
when I use SetReadDeadline(time.Time{}), The *Batch loop seems bolcked.
when i use example README.md consumer demo, use SetReadDeadline(time.Time{}), can't print read message
Kafka Version use this image wurstmeister/kafka:2.12-2.1.1 What version(s) of Kafka are you testing against? default config set What version of kafka-go are you using? v0.4.38 To Reproduce
Resources to reproduce the behavior:
docker-compose file
version: '3' services: zookeeper: image: wurstmeister/zookeeper restart: always ports: - "2181:2181" container_name: zookeeper kafka: image: wurstmeister/kafka:2.12-2.1.1 restart: always ports: - "9092:9092" depends_on: - zookeeper environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_MESSAGE_MAX_BYTES: 200000000 KAFKA_LOG_DIRS: "/kafka/kafka-logs-1" container_name: kafka volumes: - /usr/local/kafka/logs:/kafka/kafka-logs-1 ...
package main
import (
"github.com/segmentio/kafka-go"
)
func main() {
conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// TODO this place
if err = conn.SetReadDeadline(time.Time{}); err != nil {
//if err = conn.SetReadDeadline(time.Now().Add(10*time.Second)); err != nil {
panic(err)
}
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
fmt.Println("read error",err)
break
}
fmt.Println(string(b[:n]))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
Expected Behavior The loop will not be interrupted until an error occurs.
Observed Behavior The loop seems to be blocked
MaxOs Monterey 12.4 go version go1.17.13 darwin/arm64
Thanks for filing this report! Using an explicitly empty time.Time struct is not something we would recommend, but I would also not expect it to deadlock a program. We will gladly review a PR that includes a test case and a fix.
Has the problem been solved? My demo also had this problem on v0.4.47.