confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

How to check for SASL Authentication Errors?

Open mkmoisen opened this issue 1 year ago • 12 comments

Description

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': 'localhost:8082',
    'group.id': 'myconsumer',
    'security.protocol': 'sasl_ssl',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'myusername',
    'sasl.password': 'badpassword'
}

consumer = Consumer(conf)

Invoking the code above with an incorrect password does not raise any error. It seems like it asynchronously attempts to authorize, and after a few seconds a separate thread will print the following to the screen:

%3|1658355875.069|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://localhost:9092/boots]: sasl_ssl://localhost:9092/bootstrap: SASL authentication error: Authentication failed (after 5170ms in state AUTH_REQ, 5 identical error(s) suppressed)

If I then subscribe to a topic and poll, on_assign is never called and the message is never polled.

How do I figure out if I've been correctly authorized?

Would it perhaps be better if it raised a synchronous exception?

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):

    1.8.2 and 1.8.2

mkmoisen avatar Jul 20 '22 22:07 mkmoisen

In addition, certain operations will just permanently hang.

For example:

from confluent_kafka.admin import AdminClient

a = AdminClient(conf)

a.list_topics()

This permanently hangs if the password was wrong. Likewise, consumer.get_watermark_offsets does as well.

mkmoisen avatar Jul 20 '22 22:07 mkmoisen

Would it perhaps be better if it raised a synchronous exception?

I have argued for this in the past as well. I have marked this as low priority based on my perception of it getting done, but i think it would meaningfully add to usability.

mhowlett avatar Oct 25 '22 17:10 mhowlett

Hi @mhowlett

Is there any way to currently check for this error and handle it?

As far as I am aware, the only thing that occurs is that a second thread will print an error to stdout. There does not seem to be a way to programmatically check if the authorization was successful.

Assuming I am correct, if someone were to notify me about a password change, the only thing I could do is wait until after the password has been changed and then restart the application with the new password, which is not very convenient. I would like to be able to change the password in advance, and have the app retry until the upstream password has been changed. Right now it will just hang forever.

mkmoisen avatar Oct 25 '22 17:10 mkmoisen

+1 for @mkmoisen's suggestion. My use case involves the need to log messages to the console in a very customized format, so that the monitoring service can query and alert on them.

It would be convenient if I was able to explicitly check the authentication status of my Kafka client and/or be able to catch an exception, so that I can easily use my custom logging utilities and allow proper application monitoring and alerting.

ivanov-slk avatar Nov 25 '22 08:11 ivanov-slk

I'd also like to add a +1 for @mkmoisen's suggestion.

Without a fix like this it currently requires quite the workaround to handle password rotation which is becoming quite commonly required.

ns-mkusper avatar Feb 07 '23 01:02 ns-mkusper

this is rearing its head in a lot of environments

mxchinegod avatar Feb 10 '23 15:02 mxchinegod

Hi @mhowlett , looks like there is some interest. Would it be possible to re-categorize this from low to high? In my opinion the current behavior is a bug.

mkmoisen avatar Feb 12 '23 02:02 mkmoisen

We have recently added set_sasl_credential(username, password) to all the types of the client. This will enable you to update the credentials. This function will be available in the next release though.

You can use error_cb to listen to the global errors and update the credentials with the above setter. The following error is raised in case of incorrect username or password: KafkaError{code=_AUTHENTICATION,val=-169,str="sasl_plaintext://localhost:29092/bootstrap: SASL authentication error: Authentication failed: Invalid username or password (after 302ms in state AUTH_REQ)"}

I think this gives a way to update the credentials.

pranavrth avatar Feb 16 '23 08:02 pranavrth

@pranavrth Thanks for that! This approach should take care of my credential rotation scenario.

Could something similar be achieved when the producer/consumer is created? As far as I know the error callback is only called on poll() and flush().

ns-mkusper avatar May 02 '23 20:05 ns-mkusper

@pranavrth were you able to catch log messages as errors? I am struggling with same issue.

BhawnaBhati avatar May 17 '23 04:05 BhawnaBhati

@pranavrth On a similar issue, error_cb doesn't seem to be invoked for SSL errors.

def error_cb(error):
    print('ERROR:', error)
    
config = {..., 'error_cb': error_cb}

consumer = Consumer(config)

I see that this log is printed to the screen:

SSL handshake failed: error:14094416:SSL routines:ssl3_read_bytes:sslv3 alert certificate unknown: SSL alert number 46 (after 147ms in state SSL_HANDSHAKE)

However the error_cb is not invoked.

mkmoisen avatar Oct 21 '23 16:10 mkmoisen

Is there currently a way to check if given credentials are correct? I need a way to validate user credentials dynamically, and I cant seem to do that without executing additional client procedures like producing, consuming, etc.

yungFundamental avatar Mar 03 '24 15:03 yungFundamental