Kafka Supplier and Consumer need to provide separate Kafka configuration properties
In what version(s) of Spring Functions Catalog are you seeing this issue? 5.0.0 Describe the bug When kafka-supplier is used as kafka-source-kafka the properties provided to configure access to the 'external' kafka cluster is also applied to the 'internal' kafka the stream apps outputs and inputs are bound to.
To Reproduce
Deploy simple SCDF with single container kafka that doesn't require authentication in same namespace as scdf.
Deploy another kafka using Bitnami helm.
Configure a stream kafka | log with the properties:
app.kafka.spring.kafka.bootstrap-servers=my-release-kafka-controller-0.my-release-kafka-controller-headless.default.svc.cluster.local:9092, my-release-kafka-controller-1.my-release-kafka-controller-headless.default.svc.cluster.local:9092,my-release-kafka-controller-2.my-release-kafka-controller-headless.default.svc.cluster.local:9092
app.kafka.spring.kafka.properties.security.protocol=SASL_PLAINTEXT
app.kafka.spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
app.kafka.spring.kafka.properties.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"XXXXXXXX\";"
app.kafka.spring.kafka.client-id=scdf
app.kafka.spring.kafka.group-id=abc
app.kafka.kafka.supplier.topics=ABC
deployer.*.kubernetes.image-pull-policy=IfNotPresent
Expected behavior
A message created on topic ABC should be written to log.
The behaviour found is that topic Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []
** Required change ** The properties for configuration of Kafka instance should be prefixed with kafka.supplier or kafka.consumer as follows:
app.kafka.kafka.supplier.spring.kafka.bootstrap-servers=my-release-kafka-controller-0.my-release-kafka-controller-headless.default.svc.cluster.local:9092, my-release-kafka-controller-1.my-release-kafka-controller-headless.default.svc.cluster.local:9092,my-release-kafka-controller-2.my-release-kafka-controller-headless.default.svc.cluster.local:9092
app.kafka.kafka.supplier.spring.kafka.properties.security.protocol=SASL_PLAINTEXT
app.kafka.kafka.supplier.spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
app.kafka.kafka.supplier.spring.kafka.properties.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"XXXXXXXX\";"
app.kafka.kafka.supplier.spring.kafka.client-id=scdf
app.kafka.kafka.supplier.spring.kafka.group-id=abc
Alternatively the user will only supply the topic and the same kafka will be used as configured for all stream applications.
Isn't that what is supposed to be configured on the binder level via spring.cloud.stream.kafka.binder properties: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/config-options.html ?
The binder has no authentication properties but tries to use the security properties that was set for the source application.
So there could be a problem in the binder code that it picks up spring.kafka.properties when it should only be looking at spring.cloud.stream.kafka.binder.consumerProperties and spring.cloud.stream.kafka.binder.configuration
Right. So, it feels like fix has to be done in Kafka Binder itself.
Let's add @sobychacko , since I'm not fully on board how Kafka Binder works.
I will have a look at this today.
@corneil The issue is that we have a Kafka supplier on one side consuming from the first instance and then the output binding that publishes to the other instance of Kafka. However, you only provide the Spring Boot based properties for the whole application which is used by the auto-configuration on the supplier side. We think, you need to override the following properties on the output binding to override the values used by the supplier.
spring.cloud.stream.kafka.binder.configuration.bootstrap-servers=<value>
spring.cloud.stream.kafka.binder.configuration.security.protocol=
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=
@corneil The issue is that we have a Kafka supplier on one side consuming from the first instance and then the output binding that publishes to the other instance of Kafka. However, you only provide the Spring Boot based properties for the whole application which is used by the auto-configuration on the supplier side. We think, you need to override the following properties on the output binding to override the values used by the supplier.
spring.cloud.stream.kafka.binder.configuration.bootstrap-servers=<value> spring.cloud.stream.kafka.binder.configuration.security.protocol= spring.cloud.stream.kafka.binder.configuration.sasl.mechanism= spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=
Adding these properties worked.
I believe a need to change the configuration of the binders to use this mechanism as the default to avoid issue for other customer stream apps that may rely on default spring kafka configuration. Managed to create a configuration for kafka where the binder configuration is in a separate secret that is added to skipper configuration and becomes the default on all deployments.
Need to create similar for RabbitMQ