kafka-python
kafka-python copied to clipboard
NoBrokersAvailable raised in BrokerConnection.check_version when only one host is unresponsive
When api_version is not set in constructor of KafkaClient, check_version is called with timeout of api_version_auto_timeout_ms.
Then, one random node from bootstrap_servers is selected and a socket is opened, but with overridden request_timeout_ms set to time remaining of api_version_auto_timeout_ms (so basically to api_version_auto_timeout_ms, since this is a first call of the method).
Now what happens when the randomly selected node is not responding in timely manner (eg. is down for maintenance) is, that NodeNotReady is raised after api_version_auto_timeout_ms which is then immediately converted to NoBrokersAvailable which is escalated directly to eg. KafkaProducer() constructor.
I don't believe this is intended behavior - that in case of one bootstrap server is down, the whole Producer/Consumer fails to instantiate. I think it would be best to retry the check_version call with another randomly selected server whilst still in limit of api_version_auto_timeout_ms.
Example:
producer = KafkaProducer(
bootstrap_servers=["alive-kafka-server:9092", "nonresponding-server:9092"],
api_version_auto_timeout_ms=2000, # default value
request_timeout_ms=30000 # default value
)
# if nonresponding-server is chosen first for check_version, constructor raises NoBrokersAvailable after 2000 msec
# if alive-kafka-server is chosen, the producer is instantiated
One ugly workaround is to set directly set api_version, so check_version() is not called.
A possible fix is to remove config overriding so the original request_timeout_ms is taken into account and more bootstrap servers can be attempted in api_version_auto_timeout_ms interval.
My local tests showed this:
# commented out https://github.com/dpkp/kafka-python/blob/f19e4238fb47ae2619f18731f0e0e9a3762cfa11/kafka/conn.py#L1209
producer = KafkaProducer(
bootstrap_servers=["alive-kafka-server:9092", "nonresponding-server:9092"],
api_version_auto_timeout_ms=10000,
request_timeout_ms=2500
)
# if nonresponding-server is chosen first for check_version, another one is attemted in 2500ms and the producer is instantiated
# if alive-kafka-server is chosen, the producer is instantiated
I run into NoBrokersAvailble Error intermittently without a pattern. The connection continues to fail for a moment once NoBrokersAvailable occurs.
At the mean time, other python clients (over 100) with the kafka server are working good, and local ping
are all tested with positive, so unlikely the network problems.
is it confirmed this is the problem of the version
number not explictly set in constructor ?