Question about commit and disconnect after connection is lost
I noticed that when commit is called while connection to kafka is lost, the function never returns or throws any error.
How should I recover from this state?
I could set a timeout and assume the connection is lost if the function does not return within that period and try to establish connection with a different kafka server. However, the disconnect function does not return either when connection is lost. How do I perform the necessary clean up without the disconnect function?
Thanks in advance
Hi there,
Firstly, librdkafka internally manages connection state, so it's okay to get disconnected as long as it is recoverable. Are you talking about a situation where you get disconnected while committing and it cannot reconnect?
Second, you can specify multiple brokers comma separated as your metadata.broker.list to give it more to choose from when bootstrapping, but librdkafka also uses the metadata to discover other hosts it should connect to, and it uses that metadata to determine where to write/read as well. The natural consequence of this is that there should be some level of fault tolerance.
Second, there are some outstanding issues with consumer.disconnect. librdkafka tries to clean up when you close a handle, but it is currently hanging in certain situations (usually after messages have been read). I can reproduce the issue, but I don't yet have a fix (and am not certain I can even make one on my side). But if the broker goes down when you try to call disconnect, it will likely hang for quite a while trying to reconnect to give kafka the information it needs to remove it from the consumer group, and everything that kafka does to manage consumers in the group.
There is a configuration property you can set to hopefully mitigate the amount of time all of this takes. It's called session.timeout.ms. The max timeout is apparently 3 times the value you enter here, and it defaults to 30,000 ms.
Hope this helps!
Thanks for the quick reply.
The steps I am going through is:
- Connect consumer to kafka
- Wait until consumer accepts a message
- Kill the kafka instance that the consumer is connected to before consumer commits.
- Consumer calls commit and hangs (well pass 3 * 30,000 ms)
An error (Broker transport failure) is emitted only if I restart kafka and the connection is reestablished. This also happens when I specify multiple kafka brokers. The error event is only emitted after the originally connected kafka broker is restarted.
Is this part of a test suite you're writing or does this actually happen on occasion?
I'm currently using KafkaConsumer::commit which is a blocking call. Perhaps if I change it to commitAsync it won't block the rest of librdkafka from doing connection checking. But it looks like this may be a race condition, and I'm wary to make the change to commitAsync
Also if you happen to have code showing this issue, can you send it to me? I would like to make modifications to the underlying c++ and run your reproduced test case to see how the changes affect it :)
I'm testing failure cases and see how I should recover from them. I am seeing this issue consistently.
Here is a script that can reproduce the issue: https://gist.github.com/jackyjieliu/5c4c03e5884362d3fb1706bbc582f316
If you kill kafka when it is processing a message, the commit call would hang and the call back is never called.
Thanks for looking into this!
I haven't forgotten about this!
There are a few issues related to disconnects that I'm trying to clean up in case they fix this issue. Then I will investigate it separately! :)
@jackyjieliu @webmakersteve were you able to find the underlying problem to this issue?