strimzi-kafka-operator icon indicating copy to clipboard operation
strimzi-kafka-operator copied to clipboard

ingress-tcp listener: expose kafka throught ingress controller using tcp config

Open maknihamdi opened this issue 1 year ago • 2 comments

add ingress-tcp external listener type to expose kafka throught ingress controller using tcp config

Type of change

  • Enhancement / new feature

Description

If you

  • don't want to manage dynamic broker adresses with nodeport services
  • don't want to bring new LoadBalancer and external IP by broker (+ bootstrap)
  • can't use tls in your existing kafka clients
  • cant enable ssl passthrough in your ingress controller

we don't have another solution today for an external access to your kafka cluster.

The proposal is to use ingress controller TCP configuration to expose brokers on different tcp ports. TLS are optional, and no http ingress objects are managed.

strimzi will create (like with ingress listener type) cluster ip services, and manually we should care about ingress tcp configurations.

Checklist

  • [x] Write tests
  • [ ] Make sure all tests pass
  • [x] Update documentation
  • [ ] Check RBAC rights for Kubernetes / OpenShift roles
  • [ ] Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • [x] Reference relevant issue(s) and close them after merging
  • [ ] Update CHANGELOG.md
  • [ ] Supply screenshots for visual changes, such as Grafana dashboards

maknihamdi avatar Sep 21 '22 14:09 maknihamdi

Can one of the admins verify this patch?

strimzi-ci avatar Sep 21 '22 14:09 strimzi-ci

completely agree with you, it's not really an ingress implementation. there is no Ingress objects managed with this listener. But it is a custom implementation if we want to have an ingress tcp routing. BTW, ther is another problem with this name, we cant have an all-in-the-box implementation like ingress listener, because ingress tcp config are generally shared with other services, with nginx ingress controller we should edit LoadBalancer services to define exposed ports and a special configmap to define port/services mappings. We may create a configmap "template" to help uses with a simple copy/past to create the nginx configmap.

my first intention was to call it "custom" or generic listener (like the class used by the operator) because we only need to have a custom config and a cluster ip service by broker. I prefer your suggestion: clusterIp listener.

keep open for other suggestions!

thank you fr your review! I consider your changing requests in another commit

maknihamdi avatar Sep 21 '22 16:09 maknihamdi

Something like cluster-ip sounds fine to me. @maknihamdi - I can take another look when you've made the updates. I'm interested in the doc changes, as I'm involved in maintaining them

We'll need something in the following places:

  • https://github.com/strimzi/strimzi-kafka-operator/blob/main/documentation/api/io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListener.adoc
  • https://strimzi.io/docs/operators/in-development/configuring.html#assembly-accessing-kafka-outside-cluster-str

And possibly in other places where we talk about listeners. Maybe if we look at these two sections and then I can follow up with a PR that updates elsewhere as necessary?

PaulRMellor avatar Sep 22 '22 07:09 PaulRMellor

type: service is another option which came to my mind. As in ...

  • type: route creates Routes
  • type: ingress creates Ingresses
  • type: service creates Services 🤷

scholzj avatar Sep 22 '22 14:09 scholzj

LoadBalancer/NodePort are also a Service ... it can be a source of confusion

maknihamdi avatar Sep 22 '22 14:09 maknihamdi

Yeah, true to that. I will try to get some more feedback on this from other maintainers to avoid renaming it five times.

scholzj avatar Sep 22 '22 14:09 scholzj

thank you @scholzj for your review! I also have the systemtest to do, and I think miss some documentations requested by @PaulRMellor come back with other commits with all of this

maknihamdi avatar Sep 26 '22 13:09 maknihamdi

thank you @scholzj for your review! I also have the systemtest to do, and I think miss some documentations requested by @PaulRMellor come back with other commits with all of this

If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.

see-quick avatar Sep 27 '22 18:09 see-quick

If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.

I think I got it. however I couldn't run the test with my Idea. It seam like the container running on operator pod in my minikube is not synced with source code.

maknihamdi avatar Oct 10 '22 19:10 maknihamdi

If you have any trouble creating an ST you can ask me :) but as Jakub already mentioned, you can do very similar to this one https://github.com/strimzi/strimzi-kafka-operator/blob/main/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java#L158-L210.

I think I got it. however I couldn't run the test with my Idea. It seam like the container running on operator pod in my minikube is not synced with source code.

Hey, You will need to build the operator image, push it to some public registries and then change it in packaging/install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml. After that you will be able to run it with your changes.

im-konge avatar Oct 10 '22 19:10 im-konge

/azp run acceptance

scholzj avatar Oct 10 '22 22:10 scholzj

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 10 '22 22:10 azure-pipelines[bot]

/azp run regression

scholzj avatar Oct 18 '22 14:10 scholzj

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 18 '22 14:10 azure-pipelines[bot]

/azp run regression

scholzj avatar Oct 18 '22 23:10 scholzj

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 18 '22 23:10 azure-pipelines[bot]

/azp run regression

maknihamdi avatar Oct 19 '22 12:10 maknihamdi

Commenter does not have sufficient privileges for PR 7365 in repo strimzi/strimzi-kafka-operator

azure-pipelines[bot] avatar Oct 19 '22 12:10 azure-pipelines[bot]

/azp run regression

im-konge avatar Oct 19 '22 12:10 im-konge

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 19 '22 12:10 azure-pipelines[bot]

@maknihamdi I looked into the tests failures - it seems that the bootstrap server is wrong when initializing the clients. The cluster-ip has bootstrap server like my-cluster-b3118181-kafka-clusterip-bootstrap.namespace-17.svc:9102, but the clients are trying to send messages to my-cluster-b3118181-kafka-bootstrap:9102. So can you please fix it?

im-konge avatar Oct 20 '22 16:10 im-konge

@im-konge @scholzj it will be something like:

String serviceName = String.format("%s-kafka-%s-bootstrap", clusterName, Constants.CLUSTER_IP_LISTENER_DEFAULT_NAME); String bootstrapServiceName = String.format("%s.%s.svc", serviceName, namespaceName);

do you prefer to extract this 2 lines of code to a special method in https://github.com/strimzi/strimzi-kafka-operator/blob/main/api/src/main/java/io/strimzi/api/kafka/model/KafkaResources.java ? It will be useful for a couple of tests. It can be public static String customListenerBootstrapServiceName(String namespaceName, String clusterName, String listenerName) {...} we have some special thinks with this listener, it's not an plain internal one, neither an external one, this is why when reconciler compute bootstrap address, we have the listener name in the middle (cf https://github.com/strimzi/strimzi-kafka-operator/blob/main/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ListenersUtils.java#L280)

maknihamdi avatar Oct 21 '22 08:10 maknihamdi

@maknihamdi in STs we are sometimes taking the bootstrap server name from the Kafka CR, when the Kafka is deployed and ready (that means after resourceManager.createResource(); method). I don't know if we should have some method in the api module, that's something for @scholzj . But if it would be used just in tests, I guess that it's not really necessary to add it there.

im-konge avatar Oct 21 '22 08:10 im-konge

FYI: I'm traveling this week. If needed I can look more into why the test is failing next week when I get back.

scholzj avatar Oct 21 '22 22:10 scholzj

/azp run regression

scholzj avatar Oct 21 '22 22:10 scholzj

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 21 '22 22:10 azure-pipelines[bot]

/azp run regression

scholzj avatar Oct 24 '22 11:10 scholzj

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Oct 24 '22 11:10 azure-pipelines[bot]

@maknihamdi I wanted to merge it and I built it and tried it locally just because of it. Unfortunately, I found two more issues.

  1. The bootstrap service does not registr all the proper DNS names for the certificate SANs. It seems to register only the one with the format <service-name>.<namespace>.svc. It should also register <service-name>.<namespace>, <service-name> and <service-name>.<namespace>.svc.<cluster-DNS-suffix> as done for example here: https://github.com/strimzi/strimzi-kafka-operator/blob/ba1fe15383510c061be1c64189c57c7c41b95c5f/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ClusterCa.java#L188-L192

    This is not necessarily a blocker -> I can do this in a follow-up PR if needed ... however, the second issue is a blocker and needs to be fixed.

  2. The broker DNS name is not registered anywhere and added tot he SAN. So even if you use the bootstrap address whihc is part of the certificate, you still run into this:

    Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching my-cluster-kafka-tls-2.myproject.svc found.
        at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:349)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:292)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
        at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$pollRecords$6(KafkaReadStreamImpl.java:154)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching my-cluster-kafka-tls-2.myproject.svc found.
        at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:212)
        at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:415)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
        ... 31 more
    

    which renders it unusable for any client which tries to use it with TLS. We need to register the DNS name also when the user override is not configured (in this case, <service-name>.<namespace>.svc should be enough because this is the follow-up connection based on the advertised address). This is a blocker because without it it basically doesn't work.

We plan to start a new release this week. So please let me know if you need any help with this so tht we can close it in time for the new release and don't have to wait for the next one.

scholzj avatar Oct 24 '22 22:10 scholzj

/azp run regression

scholzj avatar Oct 27 '22 09:10 scholzj