kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

UnrecognizedBrokerVersion raised by SSL connections

Open dknight10 opened this issue 5 years ago • 10 comments

Filing this because docs say to create a bug when seeing this error: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html#kafka.client.KafkaClient.check_version

I am getting this error when constructing KafkaConsumer without SSL properties that is trying to connect to SSL only Kafka cluster.

kafka-python version = 1.4.6 Kafka version = 1.1.1

dknight10 avatar Apr 26 '19 11:04 dknight10

Have the same issue on 1.1.0. Confluent kafka works fine but I cannot roll it on AWS

illi4 avatar Apr 29 '19 06:04 illi4

+1

joarleymoraes avatar May 06 '19 16:05 joarleymoraes

+1

chaoxing-gsd avatar May 17 '19 07:05 chaoxing-gsd

Can you provide the following:

  1. The consumer configuration you're using? IE, KafkaConsumer(...)
  2. Can you print the value of f.value at this point in the code? https://github.com/dpkp/kafka-python/blob/91d31494d02ea636a991abb4dfb25dd904eefd45/kafka/conn.py#L1104

jeffwidman avatar May 23 '19 16:05 jeffwidman

Seeing this with a KafkaProducer:

KafkaProducer(
            bootstrap_servers=['host1:port', 'host2:port', 'host3:port'],
            ssl_check_hostname=True,
            ssl_cafile=/path/to/rootCA.pem,
            ssl_certfile=None,
            ssl_keyfile=None)

relevant part of traceback:

  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 381, in __init__
    **self.config)
  File "/usr/local/lib/python3.5/site-packages/kafka/client_async.py", line 239, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.5/site-packages/kafka/client_async.py", line 874, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/usr/local/lib/python3.5/site-packages/kafka/conn.py", line 1135, in check_version
    raise Errors.UnrecognizedBrokerVersion()

Later today I can install from source and print the value you requested above, if it's helpful

shadetree01010100 avatar Jul 22 '19 15:07 shadetree01010100

@jeffwidman I installed form source and printed out a few things, basically in my case what's happening is no requests made in check_version() are successful, so the for loop here is never broken and I end up in the else clause where this Exception is raised. That is to say, f.succeeded() is always returning False: https://github.com/dpkp/kafka-python/blob/91d31494d02ea636a991abb4dfb25dd904eefd45/kafka/conn.py#L1100

From what I understand right now, the reason my requests are failing is that my SSL configuration is not valid. I'm working on that, but maybe this is the core of the issue others are facing, is this really an issue with incomplete Exception messages? It's not the the broker version was not recognized, it's that my config is invalid and my request(s) failed.

shadetree01010100 avatar Jul 23 '19 16:07 shadetree01010100

I found out solution in my similar case. All I have to do is to add security_protocol="SSL" and connection was established.

arkblame avatar Jul 31 '19 07:07 arkblame

I got UnrecognizedBrokerVersion problems connecting today with kafka version 2.1.0 and security_protocol='SASL_SSL'.

My solution was to disable TLSv1

context = ssl.create_default_context()
        context.options &= ssl.OP_NO_TLSv1
        context.options &= ssl.OP_NO_TLSv1_1
        context.load_verify_locations(cafile=/secure/cacerts)
        context.check_hostname = False

KafkaProducer(security_protocol='SASL_SSL', ssl_context=context, ...

It solved my UnrecognizedBrokerVersion errors, now I can produce messages to kafka 2.1.0 with SASL_SSL security.

mbreevoort avatar Aug 14 '19 15:08 mbreevoort

I wonder if there's a way for us to programmatically realize that SSL or SASL should be enabled and then give a more helpful error message? This may not be possible, it depends what the broker gives us back.

jeffwidman avatar Feb 12 '20 20:02 jeffwidman

It's expecting the kafka version:

I passed the below config while initializing the consumers and producers: Specify the desired API version as (2, 4, 1) for Kafka 2.4.1 api_version = (2, 4, 1) producer_config = { 'bootstrap_servers': 'your_cluster_link', 'api_version': api_version, 'security_protocol': "SSL" } producer = KafkaProducer(**producer_config) producer.send('your_topic', key=b'your_key', value=b'your_message') producer.flush()

The same config can also be used to initialize the consumer, along with consumer config

iamaslowwalker avatar Jan 17 '24 11:01 iamaslowwalker