kafka-go
kafka-go copied to clipboard
ReadMessage in loop getting 'read message error'
Hi, I am using kafka-go version v0.4.39 How fix error: "Rebalance In Progress: the coordinator has begun rebalancing the group, the client should rejoin the group"? I have ReadMessage in loop. I catch error and continue read messages, but i can't get messages anymore (only errors)
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
GroupID: test1,
Partition: 0,
MinBytes: 0,
MaxBytes: 1024 * 1024,
ReadBatchTimeout: 100 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
}),
func (s *RequestResponseServer) run(ctx context.Context) {
for {
rawMessage, err := s.reader.ReadMessage(ctx)
if err != nil {
s.loggerEntry.Errorf("read message error: %s", err)
time.Sleep(1 * time.Second)
continue
}
request := &requestMessage{}
err = json.Unmarshal(rawMessage, request)
if err != nil {
s.loggerEntry.Errorf("request message unmarshal error: %s", err)
continue
}
handlingResult, handleError := s.requestHandler(request.Body)
var handleErrorMsg string
if handleError != nil {
s.loggerEntry.Errorf("request handle error: %s", handleError)
handleErrorMsg = handleError.Error()
}
responseWriter, err := s.writersProvider.getOrCreate(request.ReturnAddress)
if err != nil {
s.loggerEntry.Errorf("response writer get error: %s", err)
continue
}
response := &responseMessage{
Id: request.Id,
Body: handlingResult,
Error: handleErrorMsg,
}
r, err := json.Marshal(response)
if err != nil {
s.loggerEntry.Errorf("response message marshal error: %s", err)
continue
}
err = responseWriter.Write(r)
if err != nil {
s.loggerEntry.Errorf("response write error: %s", err)
continue
}
}
}
Hi @Scrubi19, please provide a minimal reproducible code example. Please remove all of the unrelated code related to the web server you are running and provide something we can reproduce.
Additionally, it looks like you aren't providing a topic to your ReaderConfig. This code would throw this error and never join the consumer group.
I also encountered the same problem. It recurred when I created a topic in a new Kafka environment and joined the group for consumption. However, when I restarted the service and consumed it again, the problem was resolved. I need to manually restart it when deploying services to new customers.
版本号 v0.4.47 遇到同样的问题