laravel-kafka
laravel-kafka copied to clipboard
TLS Authentication
Hello there,
This is primarily a follow-up on #127, which resolved concerns regarding SASL_SSL but not for instances that require TLS.
When connecting to my Kafka instance, I have to provide a CA location, Certification Location, and Key Location.
When utilizing a kcat, this would typically look like the following:
kcat -b example.com:9093 -t topic_name \
-X security.protocol=ssl \
-X ssl.ca.location=/some/location/kafka.crt \
-X ssl.certificate.location=/some/location/client.crt \
-X ssl.key.location=/some/location/client.key \
-X ssl.endpoint.identification.algorithm=none
The documentation includes instructions for utilizing username/password, but not the above setup.
While username and password authentication via SSL is appreciated, improved security via TLS/certificate and key provisioning would help support more instances of Kafka.
Hey @corsair :wave:
have you tried specifying your config using the withOption or withOptions methods?
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOptions([
'ssl.ca.location' => '/some/location/kafka.crt',
'ssl.certificate.location' => '/some/location/client.crt',
'ssl.key.location' => '/some/location/client.key',
'ssl.endpoint.identification.algorithm' => 'none'
]);
Closing this now. Feel free to open another issue if your problem persists.
Hey @mateusjunges, thank you dearly for the response, and I apologize for responding so late; it appears that said configuration/options aren't working or are being bypassed in some capacity.
With the following code, I cannot connect to the server, however is successful when connecting via kcat/other tools. I have also tried without the ->withSecurityProtocol('SSL') line, which had no impact.
$consumer = Kafka::consumer()
->withSecurityProtocol('SSL')
->withOptions([
'ssl.ca.location' => storage_path('certs/kafka.crt'),
'ssl.certificate.location' => storage_path('certs/client.crt'),
'ssl.key.location' => storage_path('certs/client.key'),
'ssl.endpoint.identification.algorithm' => 'none',
//'debug' => 'security,broker,protocol',
//'log_level' => (string) LOG_DEBUG, // Enable debug logging
])
->subscribe('example')
->withHandler(function (ConsumerMessage $message) {
ProcessKafkaVehicleMessage::dispatch($message);
})
->build();
try {
$consumer->consume();
} catch (Exception $e) {
Log::error('Kafka error.', [$e]);
} catch (ConsumerException $e) {
Log::error('Kafka consumer error.', [$e]);
}
My environmental values are as so:
KAFKA_BROKERS=kafka.example.com:9093
KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_MECHANISMS=SSL
Here are my rdkafka and broker versions:
// Client
rdkafka support => enabled
version => 6.0.3
build date => Jul 2 2022 13:15:21
librdkafka version (runtime) => 2.5.3
librdkafka version (build) => 2.2.0.255
// Server (Broker)
Kafka version: 3.8.0
Here are the logs I've gotten.
%6|1729083027.316|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083027.316|ERROR|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%6|1729083027.316|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083027.316|ERROR|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%3|1729083027.316|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%6|1729083027.572|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
%6|1729083027.714|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
%3|1729083027.714|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
^C%3|1729083029.183|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 48ms in state APIVERSION_QUERY)
%3|1729083029.183|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
If I enable additional debugging, I get the following:
%7|1729082762.672|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1729082762.672|BROKER|rdkafka#consumer-1| [thrd:app]: kafka.example.com:9093/bootstrap: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Enter main broker thread
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:main]: kafka.example.com:9093/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1729082762.672|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.5.3 (0x20503ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, CC CXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x282)
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Received CONNECT op
%7|1729082762.672|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: broker in state TRY_CONNECT connecting
%7|1729082762.672|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1729082762.672|BRKMAIN|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1729082762.672|BROKER|rdkafka#producer-2| [thrd:app]: kafka.example.com:9093/bootstrap: Added new broker with NodeId -1
%7|1729082762.672|BRKMAIN|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Enter main broker thread
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:app]: kafka.example.com:9093/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1729082762.672|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v2.5.3 (0x20503ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, CC CXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x282)
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Received CONNECT op
%7|1729082762.672|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.672|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: broker in state TRY_CONNECT connecting
%7|1729082762.672|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1729082762.672|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: coordinator query
%7|1729082762.699|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connecting to ipv4#1.2.3.4:9093 (plaintext) with socket 20
%7|1729082762.699|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connecting to ipv4#1.2.3.5:9093 (plaintext) with socket 21
%7|1729082762.742|CONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected to ipv4#1.2.3.4:9093
%7|1729082762.742|CONNECTED|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected (#1)
%7|1729082762.742|FEATURE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1729082762.742|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1729082762.742|SEND|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1729082762.745|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected to ipv4#1.2.3.5:9093
%7|1729082762.745|CONNECTED|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Connected (#1)
%7|1729082762.745|FEATURE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1729082762.745|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1729082762.745|SEND|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1729082762.786|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 44ms in state APIVERSION_QUERY) (_TRANSPORT)
%6|1729082762.786|FAIL|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 44ms in state APIVERSION_QUERY)
%7|1729082762.786|FEATURE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%3|1729082762.786|ERROR|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state DOWN -> INIT
%7|1729082762.786|STATE|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1729082762.786|RECONNECT|rdkafka#producer-2| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Delaying next reconnect by 208ms
%7|1729082762.791|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY) (_TRANSPORT)
%6|1729082762.791|FAIL|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 46ms in state APIVERSION_QUERY)
%7|1729082762.791|FEATURE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1729082762.791|STATE|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: kafka.example.com:9093/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%3|1729082762.791|ERROR|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: 1/1 brokers are down
%7|1729082762.791|CONNECT|rdkafka#consumer-1| [thrd:kafka.example.com:9093/bootstrap]: Cluster connection already in progress: broker down
On the server, via server.log, I receive the following error:
[2024-10-16 12:37:21,218] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /1.2.3.5 (channelId=1.2.3.4:9093-1.2.3.5:57580-5) (SSL handshake failed) (org.apache.kafka.common.network.Selector)
I greatly appreciate your prior response and any advice you might have.
alright, thanks for letting me know, I'll see if I can take a look later today. And thanks for sponsoring me too 😃
@corsair I managed to get it working and I put together a simple docker compose file for you to try it out. I can connect to the cluster either via CLI or using this package. Here's the repo with some basic documentation in the README
Just published a blog post on how to setup SSL for your local kafka cluster. You can read it here https://laravelkafka.com/articles/setting-up-a-kafka-cluster-with-ssl-tls-a-step-by-step-guide