confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

How to re-read an event when this one should be retried?

Open sneko opened this issue 3 years ago • 2 comments

Description

Hi,

I have a topic where I need to keep the message order and it's not an option to have a "retry topic" or something like that.

It's consumed by a consumer group and for now I have 1 consumer for multiple partitions. In case an event cannot be consumed either due to a programmatic mistake or because of a partner down, I don't commit the offset (so in case of a restart it would reconsume this message).

My concerns to replay the message without a restart:

  • (A) I could use a loop over the process of this event (below the consumer.Poll()) but the risk if it takes too long is max.poll.interval.ms being reached and my consumer being unassigned (rebalanced elsewhere if multiple consumers)

  • (B) Let's say since I don't commit the message, I retry to .Poll() to redo a "normal process", surprisingly it will fetch the next message (n+1) because each time a .Poll() is done, its internal offset increments even if the message was not committed

  • (C) Before retrying I may want to Sleep(..) a bit to not flood the remote services if any. That's a good idea but at the same time, since my consumer manages potentially multiple partitions, I don't want this to be blocking for other partitions. Also, it could reach too the max.poll.interval.ms making the consumer unassigned. So I guess to manage this I should use consumer.Pause(eventFailedPartition) to simulate the Sleep(...), and when ready I could use consumer.Resume(eventFailedPartition). Like that in the meantime the consumer is able to consume other partitions... but still...

I think I can manage my issue by mixing using a consumer group, committing each event, using Pause/Resume to delay partition that needs to be retried later without blocking others

But... remains the question when "resuming" to make the ".Poll()" retrying the exact same offset for the failed partition? What's the correct way to set it after the initialisation of the consumer?

Otherwise a solution could be:

  • Once a consumption failed, I pause the consumer, store the failed Kafka message and run a goroutine that waits for the next retry, process again the message until it passes, and if so: it commits, it resumes the consumer (and in the meantime the .Poll loop was still used by other partitions. But I'm a bit afraid of using goroutine... Since it's already a pain to manage mutexes across sensitive Kafka operations... I have a doubt it won't blow up haha

I don't assume I have a valid solution, just sharing to know if any of you have?

Thank you,

cc @edenhill

EDIT: I should try .Seek() tomorrow according to https://github.com/confluentinc/confluent-kafka-go/issues/134#issuecomment-366293717 but I'm still interested by your thoughts if what I'm saying makes sense or not

EDIT2: It seems nack(...) function would be the best way so the next .Poll() would fetch the same message, but it seems not implemented in the Go library? https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/Acknowledgment.html

sneko avatar Oct 06 '21 16:10 sneko

I implemented the last solution I mentioned but I noticed:

  • When doing sequentially Poll / Seek(sameOffset) / Pause / wait a few / Resume / Poll, I have a random chance to get as next offset the exact same offset. So... even if I try to reconsume the same to retry the process, I may get the following message that could work and commit... putting the "failed message" as validated whereas it's not
  • But when doing Poll / Pause / Seek(sameOffset) / wait a few / Resume / Poll it works all the time, I get the same message (same offset)

It's really disturbing since nothing explicit comes to my mind about this behavior of switching Seek <-> Pause. Any idea @edenhill ?

sneko avatar Oct 13 '21 09:10 sneko

@sneko Have you tried to use Seek without Pause/Resume? There is no mention in documentation that using Seek requires pausing of consumer: https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html#Consumer.Seek

sergolius avatar Jul 06 '23 22:07 sergolius