confluent-kafka-python
confluent-kafka-python copied to clipboard
how to check connection before producer or consumer start.
Description
the errors was that, we have not install or run kafka in localhost, while I write a example to test the producer and consumer, it still will block forever. there maybe sometime as we config the url error or forget to start the kafka server. but the behavor above make it hard to discover or debug.
so I wonder if there any way to check the connnection for the producer and consumer before loop to poll or produce.
set a callback via error_cb in the producer or consumer constructor and check for code=_ALL_BROKERS_DOWN (which will happen quickly if no brokers are available). you generally won't want to exit out of your application if it's been up for some time, since the clients will automatically recover, so you'll want to use sort of timer to check how long the application has been running before exiting.
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
this is a common question, and I think very valid usage. we could consider providing some blocking/awaitable constructor mechanism in the future in addition to the existing one.
I disagree, why differentiate between the case where brokers are unavailable at startup vs during runtime? It is a distributed system so the downtime can happen at any time and singleing out specific cases does not make the system more robust.
Instead I recommend focusing on the data needs
- what time is allowed to produce this piece of data (
message.timeout.ms)? - what is the longest period without new incoming data before the pipeline is deemed malfunctional (consumer)? e.g., if we haven't received a new message within 10 minutes something is fishy.
They're very different scenarios:
If all brokers are down on startup, the probabilities are very much in favor of this being a config / firewall / routing / forgot to start my local test server etc. problem that's not going to resolve itself, and you don't want to handle this by pretending everything is fine. This happens very frequently - we shouldn't assume it never happens. Also, if the server is genuinely down and will eventually come back up, and an app is started, then I think good chance whoever/whatever started the app might not know that, and would like to!
More color / examples:
If I have an app i'm using on my local machine, and I start it up and all brokers are down, I don't want it to hang there without knowing what's going on - the app should say kafka seems to be down as soon as it knows it, then quit or take some action as soon as it comes up. How do you implement this now? You need to add an 'error' handler, react to the all brokers down message, also set a log handler, work out what debug level you need to set and what message you need to react to there to know when the cluster is up. It's hard to implement this very common requirement. Much better UX to have blocking/awaitable startup code.
I also feel this is most often a better API with long running server apps too. I've seen one of our cloud people caught out by this - config or routing error or something and didn't know it. I think this happens frequently and people want to catch it fast - they want their control plane spitting out errors in this case not a zombie process. As is, they don't necessarily realize there's a problem and won't catch it fast because they're expecting the startup to fail (or didn't think about it, just expected something like this would cause a problem quickly). Trying to abstract this away isn't being helpful.
@mhowlett, I agree it.
@mhowlett does the consumer detect broker restarts and re-establish the connection to the broker?
Neither the consumer nor producer will know when a broker restart specifically; rather they will know when they lose connectivity to the broker and re-establish a connection.
I'm not sure why this was closed. I think @mhowlett raised some good points on how this would be useful. Was this closed because this won't be implemented?
@spenczar I agree. The DX on this is pretty horrible.
It's similar in Kafka libraries for other languages as well, which is why I think it's difficult to address it here because "it's just the way it is".
Still that doesn't mean it shouldn't be improved.
This specific issue can be easily handled on the python side of the library. Closing the issue was not warranted, I'm sure majority wants this issue to be addressed.