kafka-python
kafka-python copied to clipboard
UnrecognizedBrokerVersion raised by SSL connections
Filing this because docs say to create a bug when seeing this error: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html#kafka.client.KafkaClient.check_version
I am getting this error when constructing KafkaConsumer without SSL properties that is trying to connect to SSL only Kafka cluster.
kafka-python version = 1.4.6 Kafka version = 1.1.1
Have the same issue on 1.1.0. Confluent kafka works fine but I cannot roll it on AWS
+1
+1
Can you provide the following:
- The consumer configuration you're using? IE,
KafkaConsumer(...)
- Can you print the value of
f.value
at this point in the code? https://github.com/dpkp/kafka-python/blob/91d31494d02ea636a991abb4dfb25dd904eefd45/kafka/conn.py#L1104
Seeing this with a KafkaProducer:
KafkaProducer(
bootstrap_servers=['host1:port', 'host2:port', 'host3:port'],
ssl_check_hostname=True,
ssl_cafile=/path/to/rootCA.pem,
ssl_certfile=None,
ssl_keyfile=None)
relevant part of traceback:
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 381, in __init__
**self.config)
File "/usr/local/lib/python3.5/site-packages/kafka/client_async.py", line 239, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.5/site-packages/kafka/client_async.py", line 874, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "/usr/local/lib/python3.5/site-packages/kafka/conn.py", line 1135, in check_version
raise Errors.UnrecognizedBrokerVersion()
Later today I can install from source and print the value you requested above, if it's helpful
@jeffwidman I installed form source and printed out a few things, basically in my case what's happening is no requests made in check_version()
are successful, so the for
loop here is never broken and I end up in the else
clause where this Exception is raised. That is to say, f.succeeded()
is always returning False
:
https://github.com/dpkp/kafka-python/blob/91d31494d02ea636a991abb4dfb25dd904eefd45/kafka/conn.py#L1100
From what I understand right now, the reason my requests are failing is that my SSL configuration is not valid. I'm working on that, but maybe this is the core of the issue others are facing, is this really an issue with incomplete Exception messages? It's not the the broker version was not recognized, it's that my config is invalid and my request(s) failed.
I found out solution in my similar case. All I have to do is to add security_protocol="SSL"
and connection was established.
I got UnrecognizedBrokerVersion problems connecting today with kafka version 2.1.0
and security_protocol='SASL_SSL'.
My solution was to disable TLSv1
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
context.load_verify_locations(cafile=/secure/cacerts)
context.check_hostname = False
KafkaProducer(security_protocol='SASL_SSL', ssl_context=context, ...
It solved my UnrecognizedBrokerVersion errors, now I can produce messages to kafka 2.1.0 with SASL_SSL security.
I wonder if there's a way for us to programmatically realize that SSL or SASL should be enabled and then give a more helpful error message? This may not be possible, it depends what the broker gives us back.
It's expecting the kafka version:
I passed the below config while initializing the consumers and producers: Specify the desired API version as (2, 4, 1) for Kafka 2.4.1 api_version = (2, 4, 1) producer_config = { 'bootstrap_servers': 'your_cluster_link', 'api_version': api_version, 'security_protocol': "SSL" } producer = KafkaProducer(**producer_config) producer.send('your_topic', key=b'your_key', value=b'your_message') producer.flush()
The same config can also be used to initialize the consumer, along with consumer config