librdkafka
librdkafka copied to clipboard
Allow re-Committing offsets
librdkafka currently ignores (application) commit requests if the offsets match that of the last known commit. For reasons stated here (commit expiry is shorter than message interval) it is desired for librdkafka not to perform this check and allow the commit to pass through to the broker.
@AlexeyRaga Should this affect auto commit behaviour as well?
I never use auto commit to be honest, so I may miss some edge cases, but I personally would expect it to recommit.
From @coberty in #1415:
I am not sure if it is wrong. I would rather say - unexpected.
We have such case: Our C++ client had not received any messages for 2 days. And after it was restarted it started to read everything from the very beginning (offset reset to earliest) what is something that we want to avoid.
It wouldn't have happened with Java client since it retains offsets while is running with auto commit.
This has led to some head scratching for us as well for somewhat bursty topic data, thanks for the work on this.
To make it simpler, maybe instead of having a config option, it would be easier to implement a new function, like commitAlways
or something like that?
We want this behaviour to be used for auto commits too, so I think a config option is the easiest approach for users.
I think this feature is particularly necessary.We have hundreds of topics(no replica) in a cluster , if a broker is broken, need to restart all the clients :(
@DavidLiuXh it is even more critical because restart doesn't seem to help, negative lag doesn't go away. So you have to choose between two options, both are bad:
- Configure consumers to
earliest
offsets, therefore it will work, but "broken" partitions will be consumed from the beginning each time you restart the job (or the job dies) - Configure consumers to
latest
offsets, therefore each time your job restarts or dies the broken partitions will continue from the high watermarks and you may lose (skip) messages.
I am in this crappy situation right now, sitting in the position #1
and praying that jobs don't restart until we figure out the solution :(
There are two workarounds:
- shut down the consumers, reset the offsets (
--to-offset ..
) using the offset tool, restart consumer. They should resume at the reset offset position. (no application logic needed) - set
auto.offset.reset=error
, handle the consumer error by seeking/assigning to a specific offset.
Oh, thanks, something to think about! The offset reset tool doesn't work for me. It errors out with something like "protocol v2 vs. protocol v1", probably because we are still using Kafka 0.10.x. But the 2nd one can be an option.
@AlexeyRaga I wrote a reset offset tool myself by librdkafka for kafka 0.9.0.1, basically do not need to restart a large number of clients.
@DavidLiuXh Can you share that tool?
@edenhill I need a little time
@edenhill I shared the tool: https://github.com/DavidLiuXh/KafkaOffsetTools
@edenhill has anything changed about this issue? This just hot us really hard here: a couple of partitions that were getting data infrequently suddenly lost their offsets...
@AlexeyRaga This is still on the backburner, let's look into it after the v1.0.0 release.
@edenhill any update on this?
This is too big of a change (risk-wise) to go into v1.5, will adress after that release.
@edenhill One more ping regarding this issue.
We also have this annoying case with topics where data is rarely written to. Consumer is working 24/7, but after it restarts, sometimes it begins to process these topics from the beginning (due to offsets.retention.minutes
setting).
Re-committing current offsets for consumers even when no new messages arrive would be a perfect solution for us.