kafka icon indicating copy to clipboard operation
kafka copied to clipboard

Errors in dataHandler cause offset to jump - messages can get skipped

Open eastlondoner opened this issue 8 years ago • 9 comments

in base_consumer around line 70 the offset for the partition is set to the offset of the last message in the messageSet even if the dataHandler returned an error (because it is in a finally clause).

This causes the consumer to then go and fetch messages from the offset of the last message in the messageSet, even if the failure occurred only part way through processing the messageSet. Which can lead to skipping of messages.

I think it would generally be much more desirable to use the last committed offset in the case of a failure from the dataHandler or to have a way to pass back the offset that should be used from dataHandler.

eastlondoner avatar Oct 24 '16 22:10 eastlondoner

This is done to avoid looping on the same message that caused the error again and again forever. The best method is actually to catch errors in your data handler and do whatever you think is better for your use case. Skip the message, try again or shut down the application.

oleksiyk avatar Oct 25 '16 09:10 oleksiyk

This can skip many many more messages than the one that caused the error.

This can arise when a call to commitOffset fails because of "rebalanceInProgress" What should I do in my data handler in that case?

I'm happy to make a PR but I am trying to work out what is the best strategy. I think the current one where it (fairly silently) skips potentially large chunks of messages is quite bad.

eastlondoner avatar Oct 25 '16 10:10 eastlondoner

What should I do in my data handler in that case?

Retry several times with timeout.

oleksiyk avatar Oct 25 '16 10:10 oleksiyk

retry with timeout doesn't seem very safe. I am pretty sure that there are various intermittent network scenarios where that could still result in skipping messages.

Having an option to always proceed from LATEST_OFFSET rather than _.last(messageSet).offset would be much more preferable to me

I don't think that the library should try and take responsibility for the case where the user poisons the queue with a bad message - I think it's fine to leave that one to the user.

I don't think that throwing away an arbitrary number of messages is a good trade off to be able to automatically handle poisoned queues.

If I make a PR to allow an option to the consumer which makes it use LATEST_OFFSET would you approve that in principle?

eastlondoner avatar Oct 25 '16 17:10 eastlondoner

LATEST_OFFSET is the end of the message queue, its not the latest committed offset. So this way you will skip even more messages.

I don't think I'm willing to change default behaviour.

oleksiyk avatar Oct 25 '16 19:10 oleksiyk

Hi! Can I offer a PR for this? A few options come to mind:

  • Let me flag in options I don't want you to commit offsets
  • Let my error handler return a boolean to NOT commit anything
  • Not commit when the errors are clearly Kafka errors / when Kafka is in a bad state

I use Kafka partially because the replayability lets me be confident in the data quality of my downstream systems. Replayability is most valuable when my consumer breaks for a while. If the library layer makes it hard to replay from where the consumer broke, I need to add more layers of wrapper around Kafka library

davidsketchdeck avatar Oct 25 '16 23:10 davidsketchdeck

I don't want you to commit offsets

to NOT commit anything

Not commit when

Can you please clarify what you mean? no-kafka doesn't commit any offsets on its own. It only advances fetch offset.

oleksiyk avatar Oct 26 '16 07:10 oleksiyk

Apologies on my misuse of the terminology around LATEST_OFFSET - I would like the consumer to either fail catastrophically or fetch using the last committed offset if there is an error in data handler function.

The current behaviour fetches from a new offset and if my application is able to process the message at the new offset it is then 'tricked' into committing the new offset and it has skipped some messages.

eastlondoner avatar Oct 27 '16 10:10 eastlondoner

I understand your position. But adding such option will be a big change to the code. Fetching is done in BaseConsumer which is a base class for SimpleConsumer and GroupConsumer. Both child classes have different offset management semantics and so using committed offset in a fetch handler can't be easily added to their base class. This will be a big update which I would like to avoid in a quite stable library. Not only that, I also don't have enough time to work on this or even dig into someone's PR.

So still the best solution is to catch and process errors in your application code (in your data handler). I understand that you'll have to deal with possible Rebalance or Network errors but you can make your own tradeoffs here suitable for your use case. If we add this to the library it will absolutely for sure generate more requests to customise newly added behaviour. Not everyone needs to keep trying on the failed offset, someone may just process millions of some stats messages and will prefer to move to next message. For this reason I just catch all errors in my data handler.

oleksiyk avatar Oct 27 '16 11:10 oleksiyk