strimzi-kafka-operator
strimzi-kafka-operator copied to clipboard
ingress-tcp listener: expose kafka throught ingress controller using tcp config
add ingress-tcp external listener type to expose kafka throught ingress controller using tcp config
Type of change
- Enhancement / new feature
Description
If you
- don't want to manage dynamic broker adresses with nodeport services
- don't want to bring new LoadBalancer and external IP by broker (+ bootstrap)
- can't use tls in your existing kafka clients
- cant enable ssl passthrough in your ingress controller
we don't have another solution today for an external access to your kafka cluster.
The proposal is to use ingress controller TCP configuration to expose brokers on different tcp ports. TLS are optional, and no http ingress objects are managed.
strimzi will create (like with ingress listener type) cluster ip services, and manually we should care about ingress tcp configurations.
Checklist
- [x] Write tests
- [ ] Make sure all tests pass
- [x] Update documentation
- [ ] Check RBAC rights for Kubernetes / OpenShift roles
- [ ] Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
- [x] Reference relevant issue(s) and close them after merging
- [ ] Update CHANGELOG.md
- [ ] Supply screenshots for visual changes, such as Grafana dashboards
Can one of the admins verify this patch?
completely agree with you, it's not really an ingress implementation. there is no Ingress objects managed with this listener. But it is a custom implementation if we want to have an ingress tcp routing. BTW, ther is another problem with this name, we cant have an all-in-the-box implementation like ingress listener, because ingress tcp config are generally shared with other services, with nginx ingress controller we should edit LoadBalancer services to define exposed ports and a special configmap to define port/services mappings. We may create a configmap "template" to help uses with a simple copy/past to create the nginx configmap.
my first intention was to call it "custom" or generic listener (like the class used by the operator) because we only need to have a custom config and a cluster ip service by broker. I prefer your suggestion: clusterIp listener.
keep open for other suggestions!
thank you fr your review! I consider your changing requests in another commit
Something like cluster-ip
sounds fine to me.
@maknihamdi - I can take another look when you've made the updates.
I'm interested in the doc changes, as I'm involved in maintaining them
We'll need something in the following places:
- https://github.com/strimzi/strimzi-kafka-operator/blob/main/documentation/api/io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListener.adoc
- https://strimzi.io/docs/operators/in-development/configuring.html#assembly-accessing-kafka-outside-cluster-str
And possibly in other places where we talk about listeners. Maybe if we look at these two sections and then I can follow up with a PR that updates elsewhere as necessary?
type: service
is another option which came to my mind. As in ...
-
type: route
creates Routes -
type: ingress
creates Ingresses -
type: service
creates Services 🤷
LoadBalancer/NodePort are also a Service ... it can be a source of confusion
Yeah, true to that. I will try to get some more feedback on this from other maintainers to avoid renaming it five times.
thank you @scholzj for your review! I also have the systemtest to do, and I think miss some documentations requested by @PaulRMellor come back with other commits with all of this
thank you @scholzj for your review! I also have the systemtest to do, and I think miss some documentations requested by @PaulRMellor come back with other commits with all of this
If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.
If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.
I think I got it. however I couldn't run the test with my Idea. It seam like the container running on operator pod in my minikube is not synced with source code.
If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.
I think I got it. however I couldn't run the test with my Idea. It seam like the container running on operator pod in my minikube is not synced with source code.
Hey,
You will need to build the operator image, push it to some public registries and then change it in packaging/install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
. After that you will be able to run it with your changes.
/azp run acceptance
Azure Pipelines successfully started running 1 pipeline(s).
/azp run regression
Azure Pipelines successfully started running 1 pipeline(s).
/azp run regression
Azure Pipelines successfully started running 1 pipeline(s).
/azp run regression
Commenter does not have sufficient privileges for PR 7365 in repo strimzi/strimzi-kafka-operator
/azp run regression
Azure Pipelines successfully started running 1 pipeline(s).
@maknihamdi I looked into the tests failures - it seems that the bootstrap server is wrong when initializing the clients.
The cluster-ip has bootstrap server like my-cluster-b3118181-kafka-clusterip-bootstrap.namespace-17.svc:9102
, but the clients are trying to send messages to my-cluster-b3118181-kafka-bootstrap:9102
. So can you please fix it?
@im-konge @scholzj it will be something like:
String serviceName = String.format("%s-kafka-%s-bootstrap", clusterName, Constants.CLUSTER_IP_LISTENER_DEFAULT_NAME);
String bootstrapServiceName = String.format("%s.%s.svc", serviceName, namespaceName);
do you prefer to extract this 2 lines of code to a special method in https://github.com/strimzi/strimzi-kafka-operator/blob/main/api/src/main/java/io/strimzi/api/kafka/model/KafkaResources.java ? It will be useful for a couple of tests. It can be public static String customListenerBootstrapServiceName(String namespaceName, String clusterName, String listenerName) {...}
we have some special thinks with this listener, it's not an plain internal one, neither an external one, this is why when reconciler compute bootstrap address, we have the listener name in the middle (cf https://github.com/strimzi/strimzi-kafka-operator/blob/main/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ListenersUtils.java#L280)
@maknihamdi in STs we are sometimes taking the bootstrap server name from the Kafka CR, when the Kafka is deployed and ready (that means after resourceManager.createResource();
method). I don't know if we should have some method in the api
module, that's something for @scholzj . But if it would be used just in tests, I guess that it's not really necessary to add it there.
FYI: I'm traveling this week. If needed I can look more into why the test is failing next week when I get back.
/azp run regression
Azure Pipelines successfully started running 1 pipeline(s).
/azp run regression
Azure Pipelines successfully started running 1 pipeline(s).
@maknihamdi I wanted to merge it and I built it and tried it locally just because of it. Unfortunately, I found two more issues.
-
The bootstrap service does not registr all the proper DNS names for the certificate SANs. It seems to register only the one with the format
<service-name>.<namespace>.svc
. It should also register<service-name>.<namespace>
,<service-name>
and<service-name>.<namespace>.svc.<cluster-DNS-suffix>
as done for example here: https://github.com/strimzi/strimzi-kafka-operator/blob/ba1fe15383510c061be1c64189c57c7c41b95c5f/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ClusterCa.java#L188-L192This is not necessarily a blocker -> I can do this in a follow-up PR if needed ... however, the second issue is a blocker and needs to be fixed.
-
The broker DNS name is not registered anywhere and added tot he SAN. So even if you use the bootstrap address whihc is part of the certificate, you still run into this:
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching my-cluster-kafka-tls-2.myproject.svc found. at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:349) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:292) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:287) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$pollRecords$6(KafkaReadStreamImpl.java:154) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching my-cluster-kafka-tls-2.myproject.svc found. at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:212) at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103) at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:415) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) ... 31 more
which renders it unusable for any client which tries to use it with TLS. We need to register the DNS name also when the user override is not configured (in this case,
<service-name>.<namespace>.svc
should be enough because this is the follow-up connection based on the advertised address). This is a blocker because without it it basically doesn't work.
We plan to start a new release this week. So please let me know if you need any help with this so tht we can close it in time for the new release and don't have to wait for the next one.
/azp run regression