confluent-kafka-python
confluent-kafka-python copied to clipboard
How to check for SASL Authentication Errors?
Description
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': 'localhost:8082',
'group.id': 'myconsumer',
'security.protocol': 'sasl_ssl',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'myusername',
'sasl.password': 'badpassword'
}
consumer = Consumer(conf)
Invoking the code above with an incorrect password does not raise any error. It seems like it asynchronously attempts to authorize, and after a few seconds a separate thread will print the following to the screen:
%3|1658355875.069|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://localhost:9092/boots]: sasl_ssl://localhost:9092/bootstrap: SASL authentication error: Authentication failed (after 5170ms in state AUTH_REQ, 5 identical error(s) suppressed)
If I then subscribe to a topic and poll, on_assign is never called and the message is never polled.
How do I figure out if I've been correctly authorized?
Would it perhaps be better if it raised a synchronous exception?
Checklist
Please provide the following information:
-
[x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):1.8.2 and 1.8.2
In addition, certain operations will just permanently hang.
For example:
from confluent_kafka.admin import AdminClient
a = AdminClient(conf)
a.list_topics()
This permanently hangs if the password was wrong. Likewise, consumer.get_watermark_offsets
does as well.
Would it perhaps be better if it raised a synchronous exception?
I have argued for this in the past as well. I have marked this as low priority based on my perception of it getting done, but i think it would meaningfully add to usability.
Hi @mhowlett
Is there any way to currently check for this error and handle it?
As far as I am aware, the only thing that occurs is that a second thread will print an error to stdout. There does not seem to be a way to programmatically check if the authorization was successful.
Assuming I am correct, if someone were to notify me about a password change, the only thing I could do is wait until after the password has been changed and then restart the application with the new password, which is not very convenient. I would like to be able to change the password in advance, and have the app retry until the upstream password has been changed. Right now it will just hang forever.
+1 for @mkmoisen's suggestion. My use case involves the need to log messages to the console in a very customized format, so that the monitoring service can query and alert on them.
It would be convenient if I was able to explicitly check the authentication status of my Kafka client and/or be able to catch an exception, so that I can easily use my custom logging utilities and allow proper application monitoring and alerting.
I'd also like to add a +1 for @mkmoisen's suggestion.
Without a fix like this it currently requires quite the workaround to handle password rotation which is becoming quite commonly required.
this is rearing its head in a lot of environments
Hi @mhowlett , looks like there is some interest. Would it be possible to re-categorize this from low to high? In my opinion the current behavior is a bug.
We have recently added set_sasl_credential(username, password)
to all the types of the client. This will enable you to update the credentials. This function will be available in the next release though.
You can use error_cb to listen to the global errors and update the credentials with the above setter. The following error is raised in case of incorrect username or password:
KafkaError{code=_AUTHENTICATION,val=-169,str="sasl_plaintext://localhost:29092/bootstrap: SASL authentication error: Authentication failed: Invalid username or password (after 302ms in state AUTH_REQ)"}
I think this gives a way to update the credentials.
@pranavrth Thanks for that! This approach should take care of my credential rotation scenario.
Could something similar be achieved when the producer/consumer is created? As far as I know the error callback is only called on poll()
and flush()
.
@pranavrth were you able to catch log messages as errors? I am struggling with same issue.
@pranavrth On a similar issue, error_cb doesn't seem to be invoked for SSL errors.
def error_cb(error):
print('ERROR:', error)
config = {..., 'error_cb': error_cb}
consumer = Consumer(config)
I see that this log is printed to the screen:
SSL handshake failed: error:14094416:SSL routines:ssl3_read_bytes:sslv3 alert certificate unknown: SSL alert number 46 (after 147ms in state SSL_HANDSHAKE)
However the error_cb is not invoked.
Is there currently a way to check if given credentials are correct? I need a way to validate user credentials dynamically, and I cant seem to do that without executing additional client procedures like producing, consuming, etc.