Consumer proceeds on when ConsumerRebalanceListener fails
I'm using a ConsumerRebalanceListener to load state necessary for processing a partition when it is assigned. However, when ConsumerRebalanceListener.on_partitions_assigned() fails for some reason (i.e. the state isn't loaded), the error is logged and the consumer proceeds on as if nothing happened, happily consuming messages from the new partition. This will lead to data loss.
To avoid this, I am having to call sys.exit() if the ConsumerRebalanceListener encounters an error, which isn't exactly desirable. It would be better if the error was propagated up so I could deal with it normally. At the very least the assignment should fail so the consumer doesn't see any messages from the new partitions, and the rebalance can be reattempted.
Agree that this seems strange. We're following the java client design very closely on this, and it follows the same pattern -- Exceptions are caught and logged. One alternate approach might be to catch Exception within your ConsumerRebalanceListener and re-raise as a direct subclass of BaseException (not Exception). But I do agree that raising exceptions here seems more pythonic.
I've raised a bug with Kafka. Hopefully they change it in the Java client. https://issues.apache.org/jira/browse/KAFKA-4600
FYI, it looks like a fix for this has been merged in the Java client, and will be released in 2.4.0 https://github.com/apache/kafka/pull/6884
@braedon want to submit a PR porting their change to here?
I'll see if I get some time to give it a go in the next few weeks.
(Spoiler alert - I didn't get to it. It's still on my list, but that's pretty long currently. If anyone else wants to pick this up in the meantime, feel free...)
Related: #2143