laravel-kafka icon indicating copy to clipboard operation
laravel-kafka copied to clipboard

TLS Authentication

Open corsair opened this issue 1 year ago • 5 comments

Hello there,

This is primarily a follow-up on #127, which resolved concerns regarding SASL_SSL but not for instances that require TLS.

When connecting to my Kafka instance, I have to provide a CA location, Certification Location, and Key Location.

When utilizing a kcat, this would typically look like the following:

kcat -b example.com:9093 -t topic_name \
  -X security.protocol=ssl \
  -X ssl.ca.location=/some/location/kafka.crt \
  -X ssl.certificate.location=/some/location/client.crt \
  -X ssl.key.location=/some/location/client.key \
  -X ssl.endpoint.identification.algorithm=none

The documentation includes instructions for utilizing username/password, but not the above setup.

While username and password authentication via SSL is appreciated, improved security via TLS/certificate and key provisioning would help support more instances of Kafka.

corsair avatar Oct 05 '24 16:10 corsair

Hey @corsair :wave: have you tried specifying your config using the withOption or withOptions methods?

$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withOptions([
        'ssl.ca.location' => '/some/location/kafka.crt',
        'ssl.certificate.location' => '/some/location/client.crt',
        'ssl.key.location' => '/some/location/client.key',
        'ssl.endpoint.identification.algorithm' => 'none'
    ]);

mateusjunges avatar Oct 06 '24 15:10 mateusjunges

Closing this now. Feel free to open another issue if your problem persists.

mateusjunges avatar Oct 08 '24 17:10 mateusjunges

Hey @mateusjunges, thank you dearly for the response, and I apologize for responding so late; it appears that said configuration/options aren't working or are being bypassed in some capacity.

With the following code, I cannot connect to the server, however is successful when connecting via kcat/other tools. I have also tried without the ->withSecurityProtocol('SSL') line, which had no impact.

$consumer = Kafka::consumer()
    ->withSecurityProtocol('SSL')
    ->withOptions([
        'ssl.ca.location' => storage_path('certs/kafka.crt'),
        'ssl.certificate.location' => storage_path('certs/client.crt'),
        'ssl.key.location' => storage_path('certs/client.key'),
        'ssl.endpoint.identification.algorithm' => 'none',
        //'debug' => 'security,broker,protocol',
        //'log_level' => (string) LOG_DEBUG, // Enable debug logging
    ])
    ->subscribe('example')
    ->withHandler(function (ConsumerMessage $message) {
        ProcessKafkaVehicleMessage::dispatch($message);
    })
    ->build();
    
try {
        $consumer->consume();
    } catch (Exception $e) {
        Log::error('Kafka error.', [$e]);
    } catch (ConsumerException $e) {
        Log::error('Kafka consumer error.', [$e]);
    }

My environmental values are as so:

KAFKA_BROKERS=kafka.example.com:9093
KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_MECHANISMS=SSL

Here are my rdkafka and broker versions:

// Client

rdkafka support => enabled
version => 6.0.3
build date => Jul  2 2022 13:15:21
librdkafka version (runtime) => 2.5.3
librdkafka version (build) => 2.2.0.255

// Server (Broker)

Kafka version: 3.8.0

Here are the logs I've gotten.

%6|1729083027.316|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083027.316|ERROR|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%6|1729083027.316|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083027.316|ERROR|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%3|1729083027.316|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%6|1729083027.572|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
%6|1729083027.714|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
%3|1729083027.714|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
^C%3|1729083029.183|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083029.183|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)

If I enable additional debugging, I get the following:

%7|1729082762.672|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1729082762.672|BROKER|rdkafka#consumer-1| [thrd:app]: kafka.example.com:9093/bootstrap: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Enter main broker thread
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:main]: kafka.example.com:9093/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1729082762.672|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.5.3 (0x20503ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, CC CXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x282)
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Received CONNECT op
%7|1729082762.672|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: broker in state TRY_CONNECT connecting
%7|1729082762.672|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1729082762.672|BRKMAIN|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1729082762.672|BROKER|rdkafka#producer-2| [thrd:app]: kafka.example.com:9093/bootstrap: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Enter main broker thread
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:app]: kafka.example.com:9093/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1729082762.672|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v2.5.3 (0x20503ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, CC CXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x282)
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Received CONNECT op
%7|1729082762.672|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: broker in state TRY_CONNECT connecting
%7|1729082762.672|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: coordinator query
%7|1729082762.699|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connecting to ipv4#1.2.3.4:9093 (plaintext) with socket 20
%7|1729082762.699|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connecting to ipv4#1.2.3.5:9093 (plaintext) with socket 21
%7|1729082762.742|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected to ipv4#1.2.3.4:9093
%7|1729082762.742|CONNECTED|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected (#1)
%7|1729082762.742|FEATURE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1729082762.742|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1729082762.742|SEND|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1729082762.745|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected to ipv4#1.2.3.5:9093
%7|1729082762.745|CONNECTED|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected (#1)
%7|1729082762.745|FEATURE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1729082762.745|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1729082762.745|SEND|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1729082762.786|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 44ms in state APIVERSION_QUERY) (_TRANSPORT)
%6|1729082762.786|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 44ms in state APIVERSION_QUERY)
%7|1729082762.786|FEATURE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features -ApiVersion to 
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%3|1729082762.786|ERROR|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state DOWN -> INIT
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.786|RECONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Delaying next reconnect by 208ms
%7|1729082762.791|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY) (_TRANSPORT)
%6|1729082762.791|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY)
%7|1729082762.791|FEATURE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features -ApiVersion to 
%7|1729082762.791|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%3|1729082762.791|ERROR|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%7|1729082762.791|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: Cluster connection already in progress: broker down

On the server, via server.log, I receive the following error:

[2024-10-16 12:37:21,218] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /1.2.3.5 (channelId=1.2.3.4:9093-1.2.3.5:57580-5) (SSL handshake failed) (org.apache.kafka.common.network.Selector)

I greatly appreciate your prior response and any advice you might have.

corsair avatar Oct 16 '24 12:10 corsair

alright, thanks for letting me know, I'll see if I can take a look later today. And thanks for sponsoring me too 😃

mateusjunges avatar Oct 16 '24 13:10 mateusjunges

@corsair I managed to get it working and I put together a simple docker compose file for you to try it out. I can connect to the cluster either via CLI or using this package. Here's the repo with some basic documentation in the README

mateusjunges avatar Oct 18 '24 00:10 mateusjunges

Just published a blog post on how to setup SSL for your local kafka cluster. You can read it here https://laravelkafka.com/articles/setting-up-a-kafka-cluster-with-ssl-tls-a-step-by-step-guide

mateusjunges avatar Nov 15 '24 15:11 mateusjunges