confluent-kafka-python
confluent-kafka-python copied to clipboard
How to correctly handle a message.error() for a corrupted message vs retryable error ?
This is not a bug, just a question.
I'm not clear on what I should do if I encounter a message.error(). My goal is to process 100% of the input messages, never discarding any message, except for those which are corrupted and can never be processed even after retrying.
I am basically doing this kind of loop:
while True:
message = consumer.poll(1)
if message:
if message.error():
continue
print(message.value())
My assumption in this loop is that there can never be a case where a message gets discarded. For example, there is no such thing as a message that is "temporarily" corrupted which would be fixed if I implemented a retry. Also there is not a case where the message gets popped off the internal queue but for some reason a error was returned at the same time.
Is that a correct assumption?
Is there any situation in which it would make sense to implement some sort of retry mechanism instead, such as breaking out of the loop and then reconnecting to the last committed offset? This way requires some method of identifying messages that are permanently corrupted in order to skip these and not get stuck in the loop.
Thank you.
Hi @mkmoisen , thanks for your question.
I found a similar question https://github.com/confluentinc/confluent-kafka-python/issues/767 also opened by you and found that there is already an answer there: https://github.com/confluentinc/confluent-kafka-python/issues/767#issuecomment-578709274
And add more explanation to enable.auto.offset.store referring from https://stackoverflow.com/questions/58517125/kafka-offset-management-enable-auto-commit-vs-enable-auto-offset-store
If you set enable.auto.offset.store=false you can update this in-memory offset store by yourself via rd_kafka_offsets_store().
there is not a case where the message gets popped off the internal queue but for some reason a error was returned at the same time. Is that a correct assumption?
yes.
you can see the set of possible errors returned by searching for rd_kafka_consumer_err in the librdkafka source.
we need to put some work on a doc to give guidance ... what is best to do with these is a bit mixed.