faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Problems configuring faststream for SASL_SSL security

Open jorrgme opened this issue 9 months ago • 1 comments

Discussed in https://github.com/airtai/faststream/discussions/1404

Originally posted by jorrgme April 29, 2024 Hi all,

I really need your help, as I've been trying everything to be able to consume from a topic using SASL_SSL authentication.

This is the code I'm using:

import ssl
from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

user      = "my_usr"
passw     = "my_pass"
bootstrap = "my.bootstrap.server.com:443"
cert      = "~/my_certs/CRT_cacerts.pem"
group     = "my_group"
topic     = "my.topic.cpd"

ssl_context = ssl.create_default_context(
    cafile=cert,
)
security = SASLPlaintext(
    ssl_context=ssl_context,
    username=user,
    password=passw,
    use_ssl=True
)

broker = KafkaBroker(
    bootstrap_servers=bootstrap,
    security=security,
)

app = FastStream(broker)

@broker.subscriber(topic, group_id=group)
async def handle_msg(data) -> str:
    print(data)

And this is the log with ssl verifying errors I'm getting:

2024-04-29 10:41:59,089 INFO     - FastStream app starting...
%3|1714380119.145|FAIL|faststream-0.5.2#producer-1| [thrd:sasl_ssl://my.bootstrap.server.com:443/bootstrap]: sasl_ssl://my.bootstrap.server.com:443/bootstrap: SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 9ms in state SSL_HANDSHAKE)

Does anyone have any idea of how I can solve my issues?

I've tried the same thing using Faust and it's working fine using the same certificate, user and password.

Thanks in advance for any help you can provide.

jorrgme avatar Apr 29 '24 08:04 jorrgme

Looks like we missed ssl_context option in the Confluent config: https://github.com/airtai/faststream/blob/main/faststream/confluent/client.py#L286

I should dig into a bit

Lancetnik avatar May 02 '24 18:05 Lancetnik

We are also getting the same error with BaseSecurity for Confluent Kafka:

%3|1717097708.473|FAIL|faststream-0.5.8#producer-1| []: SSL handshake failed: error:0A0000F4:SSL routines::unexpected message: client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information) (after 4ms in state SSL_HANDSHAKE, 30 identical error(s) suppressed)

Looking forward to @davorrunje 's fix

mmarqz avatar May 30 '24 19:05 mmarqz

Hello @jorrgme,

We have released a new version 0.5.12 which has the fix for your issue. Now, FastStream provides users with the ability to pass the config dictionary to confluent-kafka-python for greater customizability.

For your case, your code needs to be changed to

from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

user      = "my_usr"
passw     = "my_pass"
bootstrap = "my.bootstrap.server.com:443"
cert      = "~/my_certs/CRT_cacerts.pem"
group     = "my_group"
topic     = "my.topic.cpd"

security = SASLPlaintext(
    username=user,
    password=passw,
    use_ssl=True
)

broker = KafkaBroker(
    bootstrap_servers=bootstrap,
    security=security,
    config={"ssl.ca.location": cert}
)

app = FastStream(broker)

@broker.subscriber(topic, group_id=group)
async def handle_msg(data) -> str:
    print(data)

You can checkout all the configuration properties which can be set using config dictionary here - https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

kumaranvpl avatar Jun 11 '24 06:06 kumaranvpl