kafka-go
kafka-go copied to clipboard
Refetch the last un comitted message
Here is a sample code on what i am doing. My consumers are bound by GroupIDs.
I fetch the message and do some business logic on top of it. It it succeeds then i commit the message. Now there can be cases where business logic can fail to due to network issue or DB being down, in those cases i do not commit the message. Its highly likely that if i retry after 2 or 3 mins this business logic can succeed since network or DB glitches would have resolved.
I noticed that if i do not commit the message, the FetchMessage
is blocked indefinitely.
How do i fetch the last un-comitted message again or is there a setting like RedeliveryTimeout
after which last un comitted message is read again.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
msg, err := reader.FetchMessage(ctx)
if err != nil {
// Reader has been closed
if err == io.EOF {
logger.Error("reader has been closed")
return
}
logger.WithError(err).Error("failed to fetch message")
continue
}
if err := bussinessLogicOnMessage(msg); err != nil {
logger.Error(err)
continue
}
reader.CommitMessages(ctx, msg)
}
hi @achille-roussel ,
Any update on this?
Thanks, Stas
Hello @dineshgowda24 @skynet2, thanks for opening this issue, and apologies for the delayed response!
I noticed that if i do not commit the message, the FetchMessage is blocked indefinitely.
This seems unexpected, reading messages and committing offsets are two independent processes in Kafka. Are messages continuously produced to the topic(s)?
How do i fetch the last un-comitted message again or is there a setting like RedeliveryTimeout after which last un comitted message is read again.
kafka-go doesn't offer this kind of functionality, I believe the intent would be for the application to manage this type of logic, by retaining the last message retrieved from calling kafka.(*Reader).FetchMessage
.
If you get an error and then close the reader, and then at the start of our loop, create a reader, this will do what you want. As it is, it is definitely behaving as excepted, as far as I can tell.
for {
func() {
reader = newReader()
defer func() {
if err := reader.Close(); err != nil {
panic(err)
}
}()
msg, err := reader.FetchMessage(ctx)
if err != nil {
panic(err)
}
if err := business(msg); err != nil {
return
}
reader.CommitMessages(ctx, msg)
}
}