confluent-kafka-go
confluent-kafka-go copied to clipboard
How to re-read an event when this one should be retried?
Description
Hi,
I have a topic where I need to keep the message order and it's not an option to have a "retry topic" or something like that.
It's consumed by a consumer group and for now I have 1 consumer for multiple partitions. In case an event cannot be consumed either due to a programmatic mistake or because of a partner down, I don't commit the offset (so in case of a restart it would reconsume this message).
My concerns to replay the message without a restart:
-
(A) I could use a loop over the process of this event (below the
consumer.Poll()
) but the risk if it takes too long ismax.poll.interval.ms
being reached and my consumer being unassigned (rebalanced elsewhere if multiple consumers) -
(B) Let's say since I don't commit the message, I retry to
.Poll()
to redo a "normal process", surprisingly it will fetch the next message (n+1
) because each time a.Poll()
is done, its internal offset increments even if the message was not committed -
(C) Before retrying I may want to
Sleep(..)
a bit to not flood the remote services if any. That's a good idea but at the same time, since my consumer manages potentially multiple partitions, I don't want this to be blocking for other partitions. Also, it could reach too themax.poll.interval.ms
making the consumer unassigned. So I guess to manage this I should useconsumer.Pause(eventFailedPartition)
to simulate theSleep(...)
, and when ready I could useconsumer.Resume(eventFailedPartition)
. Like that in the meantime the consumer is able to consume other partitions... but still...
I think I can manage my issue by mixing using a consumer group, committing each event, using Pause/Resume to delay partition that needs to be retried later without blocking others
But... remains the question when "resuming" to make the ".Poll()" retrying the exact same offset for the failed partition? What's the correct way to set it after the initialisation of the consumer?
Otherwise a solution could be:
- Once a consumption failed, I pause the consumer, store the failed Kafka message and run a goroutine that waits for the next retry, process again the message until it passes, and if so: it commits, it resumes the consumer (and in the meantime the
.Poll
loop was still used by other partitions. But I'm a bit afraid of using goroutine... Since it's already a pain to manage mutexes across sensitive Kafka operations... I have a doubt it won't blow up haha
I don't assume I have a valid solution, just sharing to know if any of you have?
Thank you,
cc @edenhill
EDIT: I should try .Seek()
tomorrow according to https://github.com/confluentinc/confluent-kafka-go/issues/134#issuecomment-366293717 but I'm still interested by your thoughts if what I'm saying makes sense or not
EDIT2: It seems nack(...)
function would be the best way so the next .Poll()
would fetch the same message, but it seems not implemented in the Go library? https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/Acknowledgment.html
I implemented the last solution I mentioned but I noticed:
- When doing sequentially
Poll / Seek(sameOffset) / Pause / wait a few / Resume / Poll
, I have a random chance to get as next offset the exact same offset. So... even if I try to reconsume the same to retry the process, I may get the following message that could work and commit... putting the "failed message" as validated whereas it's not - But when doing
Poll / Pause / Seek(sameOffset) / wait a few / Resume / Poll
it works all the time, I get the same message (same offset)
It's really disturbing since nothing explicit comes to my mind about this behavior of switching Seek <-> Pause
. Any idea @edenhill ?
@sneko Have you tried to use Seek
without Pause
/Resume
?
There is no mention in documentation that using Seek
requires pausing of consumer: https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html#Consumer.Seek