kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

ReadMessage in loop getting 'read message error'

Open Scrubi19 opened this issue 1 year ago • 3 comments

Hi, I am using kafka-go version v0.4.39 How fix error: "Rebalance In Progress: the coordinator has begun rebalancing the group, the client should rejoin the group"? I have ReadMessage in loop. I catch error and continue read messages, but i can't get messages anymore (only errors)

reader: kafka.NewReader(kafka.ReaderConfig{
	Brokers: cfg.Brokers,
	GroupID: test1,
	Partition: 0,
	MinBytes: 0,
	MaxBytes: 1024 * 1024,
	ReadBatchTimeout: 100 * time.Millisecond,
	MaxWait: 100 * time.Millisecond,
}),
func (s *RequestResponseServer) run(ctx context.Context) {
  for {
    rawMessage, err := s.reader.ReadMessage(ctx)
    if err != nil {
	  s.loggerEntry.Errorf("read message error: %s", err)
	  time.Sleep(1 * time.Second) 
	  continue
    }
    request := &requestMessage{}
    err = json.Unmarshal(rawMessage, request)
    if err != nil {
	  s.loggerEntry.Errorf("request message unmarshal error: %s", err)
	  continue
    }
    handlingResult, handleError := s.requestHandler(request.Body)
    var handleErrorMsg string
    if handleError != nil {
	  s.loggerEntry.Errorf("request handle error: %s", handleError)
	  handleErrorMsg = handleError.Error()
    }
    responseWriter, err := s.writersProvider.getOrCreate(request.ReturnAddress)
    if err != nil {
	  s.loggerEntry.Errorf("response writer get error: %s", err)
	  continue
    }
    response := &responseMessage{
	  Id:    request.Id,
	  Body:  handlingResult,
	  Error: handleErrorMsg,
    }
    r, err := json.Marshal(response)
    if err != nil {
	  s.loggerEntry.Errorf("response message marshal error: %s", err)
	  continue
    }
    err = responseWriter.Write(r)
    if err != nil {
	  s.loggerEntry.Errorf("response write error: %s", err)
	  continue
    }
  }
}

image

Scrubi19 avatar Jun 26 '23 11:06 Scrubi19

Hi @Scrubi19, please provide a minimal reproducible code example. Please remove all of the unrelated code related to the web server you are running and provide something we can reproduce.

Additionally, it looks like you aren't providing a topic to your ReaderConfig. This code would throw this error and never join the consumer group.

petedannemann avatar Jul 17 '23 15:07 petedannemann

I also encountered the same problem. It recurred when I created a topic in a new Kafka environment and joined the group for consumption. However, when I restarted the service and consumed it again, the problem was resolved. I need to manually restart it when deploying services to new customers.

shixiaofeia avatar Nov 10 '23 07:11 shixiaofeia

版本号 v0.4.47 遇到同样的问题

zhangyinhao1234 avatar Apr 07 '24 02:04 zhangyinhao1234