kafka-go
kafka-go copied to clipboard
FetchMessage indefinitely blocking.
kafka-go is a great repository, and its API is very concise and easy to understand. 👍🏻
Describe the bug
I use kafka.Reader to fetch messages from a consumer group in a loop using the FetchMessage API.
My Config:
client := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{addr},
GroupID: "my-group",
Topic: "my-topic",
MinBytes: 10e+5,
MaxWait: 5*time.Second,
})
However, I'm experiencing a strange issue. I have deployed Kafka and my application (kafka-go v0.4.40) using Helm in my CI/CD pipeline. The FetchMessage API doesn't return any data or errors, even though I'm certain that there is data in Kafka. It appears to be indefinitely blocked, and I have waited for over 10 minutes.
However, the strange thing is that when I kill the pod, the Kubernetes deployment automatically restarts the pod, and then FetchMessage successfully reads the messages.
It seems that there is some kind of bug triggered when both Kafka server and kafka-go are started simultaneously, causing kafka-go to be unable to fetch messages.
Kafka Version
- What version(s) of Kafka are you testing against? image: bitnami/kafka:3.3
- What version of kafka-go are you using? [email protected]
To Reproduce
Resources to reproduce the behavior:
I'm unable to easily reproduce this issue; it seems to occur only in certain scenarios.
I'm not familiar with the workings of kafka-go. What methods can I use to pinpoint the specific location of the issue? For example, breakpoints, logs, or printing output at certain locations.
Thank you very much~
It might seem your application failed to establish connection to kafka. It's just my guess.
It might seem your application failed to establish connection to kafka. It's just my guess.
I believe that without the kafka-server being fully started, the client indeed cannot connect to the kafka-server. However, it should not be blocked at the FetchMessage stage (at least, from the client's perspective, it should not appear as if it is blocked at the FetchMessage API).
After the kafka-server has completed its startup process, I have confirmed that it is possible to connect to the kafka-server using the cli tool. However, the kafka-go library is still blocked at the FetchMessage stage. I believe the client should be able to connect to the kafka-server correctly.
Oh, I understood. You mean that kafka cli should initialize connection between kafka server and then return err if it couldn't, right? I agree with that.
Hello - as you've noted that it blocks indefinitely in the scenario you describe, I assume you are not using a timeout on your context. Could using a context with a timeout and implementing a retry provide a workaround for the situation?