kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Refetch the last un comitted message

Open dineshgowda24 opened this issue 3 years ago • 3 comments

Here is a sample code on what i am doing. My consumers are bound by GroupIDs.

I fetch the message and do some business logic on top of it. It it succeeds then i commit the message. Now there can be cases where business logic can fail to due to network issue or DB being down, in those cases i do not commit the message. Its highly likely that if i retry after 2 or 3 mins this business logic can succeed since network or DB glitches would have resolved.

I noticed that if i do not commit the message, the FetchMessage is blocked indefinitely.

How do i fetch the last un-comitted message again or is there a setting like RedeliveryTimeout after which last un comitted message is read again.

for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
msg, err := reader.FetchMessage(ctx)
if err != nil {
	// Reader has been closed
	if err == io.EOF {
		logger.Error("reader has been closed")
		return
	}
	logger.WithError(err).Error("failed to fetch message")
		continue
}
if err := bussinessLogicOnMessage(msg); err != nil {
   logger.Error(err)
  continue
}
reader.CommitMessages(ctx, msg)
}

dineshgowda24 avatar May 09 '21 07:05 dineshgowda24

hi @achille-roussel ,

Any update on this?

Thanks, Stas

skynet2 avatar Nov 26 '21 02:11 skynet2

Hello @dineshgowda24 @skynet2, thanks for opening this issue, and apologies for the delayed response!

I noticed that if i do not commit the message, the FetchMessage is blocked indefinitely.

This seems unexpected, reading messages and committing offsets are two independent processes in Kafka. Are messages continuously produced to the topic(s)?

How do i fetch the last un-comitted message again or is there a setting like RedeliveryTimeout after which last un comitted message is read again.

kafka-go doesn't offer this kind of functionality, I believe the intent would be for the application to manage this type of logic, by retaining the last message retrieved from calling kafka.(*Reader).FetchMessage.

achille-roussel avatar Feb 15 '22 17:02 achille-roussel

If you get an error and then close the reader, and then at the start of our loop, create a reader, this will do what you want. As it is, it is definitely behaving as excepted, as far as I can tell.

for {
	func() {
		reader = newReader()
		defer func() {
			if err := reader.Close(); err != nil {
				panic(err)
			}
		}()
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			panic(err)
		}
		if err := business(msg); err != nil {
			return
		}
		reader.CommitMessages(ctx, msg)
	}
}

wwade avatar Apr 29 '22 18:04 wwade