nussknacker icon indicating copy to clipboard operation
nussknacker copied to clipboard

Create a sample docker compose file with kafka sasl ssl or sasl_plaintext enabled, to understand the properties needed

Open andsty opened this issue 1 year ago • 8 comments

Describe the bug Not able to fetch data from kafka with sasl_ssl enabled. Not clear documentation on this topic. To Reproduce Create a kafka cluster sasl ssl enabled and try to connect nussknacker Environment (please fill in the following information):

  • OS: [e.g. Ubuntu 18.04]
  • Nussknacker version latest -latest
  • Docker image

andsty avatar Oct 28 '24 13:10 andsty

Hi @andsty . Have you checked this documentation page?

arkadius avatar Oct 28 '24 21:10 arkadius

Hi.Yes i have checked it but it does not mention how to setup my docker-compose with this configuration.I was able to connect with my schema registry but it seems that i kanno connect with my kafka environment because my sasl definition in my docker compose is wrong.How can i reference my jaas config in my docker compose?

myrulezzz avatar Oct 29 '24 04:10 myrulezzz

Ah, I see! In the installation example directory there is an application-customizations.conf file.

You should use it if you want to provide more advanced configuration that is not possible to achieve with only environment variables.

Please try with something like this:

scenarioTypes {
  "streaming" {
    modelConfig {
      components.kafka {
        config {
          kafkaProperties {
            "schema.registry.url": "http://schemaregistry:8081"
            "bootstrap.servers": "broker1:9092,broker2:9092"
            "security.protocol": "SASL_SSL"
            "sasl.mechanism": "PLAIN"
            "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"some_user\" password=\"some_user\";"
          }
        }
      }
    }
  }
}

and let me know if it helped you.

arkadius avatar Oct 29 '24 08:10 arkadius

Ok Trying this configuration in my application yaml file i get the below error in designer logs when i select a topic form ui 2024-10-29 09:39:59.558 [kafka-admin-client-thread | adminclient-5] INFO o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-5] Node -3 disconnected. 2024-10-29 09:39:59.558 [kafka-admin-client-thread | adminclient-5] INFO o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-5] Cancelled in-flight METADATA request with correlation id 45 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, request timeout: 21134ms)

andsty avatar Oct 29 '24 09:10 andsty

And i see authorization error on kafka. Is it possible there is an issue with the format passed in jaas config?

andsty avatar Oct 29 '24 09:10 andsty

I recommend checking the configuration with some cli tool (e.g. kafka-console-consumer) executed in the same network. Nussknacker passes kafkaProperties to kafka client as they are, without changing anything so it is a low possibility that the problem is on the Nu side.

Example kafka-console-consumer invocation would look like:

./kafka-console-consumer.sh --topic <topic> --bootstrap-server <boostrap-servers> --consumer.config client.properties

And client.properties would contain:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

See Confluent documentation for more details.

It is important to ensure that kafka-console-consumer was executed from the same network as the Designer. You might achieve that by adding another container for example using kafka image but with changed entrypoint to bash.

arkadius avatar Oct 29 '24 09:10 arkadius

hi i managed to make proper configuration but now when i create a new scenario and i select kafka as source i get this error Could not create kafka: /opt/nussknacker/conf/application.conf: 21: security has type OBJECT rather than STRING what am i missing from conf file?

andsty avatar Nov 23 '24 00:11 andsty

 scenarioTypes {
  2   streaming {
  3     deploymentConfig {
  4       type = "flinkStreaming"
  5       engineSetupName = "My Flink Cluster"
  6       restUrl = "http://localhost:8081"
  7     }
  8     modelConfig {
  9       classPath = [
 10         "model/defaultModel.jar",
 11         "model/flinkExecutor.jar",
 12         "components/flink",
 13         "components/common"
 14       ]
 15       components {
 16         kafka {
 17           config {
 18             kafkaProperties {
 19               schema.registry.url = "http://schema-registry.xxxxxxx-as.net:8081"
 20               bootstrap.servers = "kafkabroker1.xxxxx-as.net:9092,kafkabroker2.xxxxxx-as.net:9092,kafkabroker3.xxxxx-as.net:9092"
 21               security.protocol = "SASL_SSL"
 22               sasl.mechanism = "SCRAM-SHA-512"
 23               ssl.truststore.type = "JKS"
 24               ssl.truststore.location = "/opt/nussknacker/conf/ksqldb.truststore.jks"
 25               ssl.truststore.password = "xxxxxx"
 26               sasl.jaas.config = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxxxx\";"
 27             }
 28           }
 29         }
 30       }
 31     }
 32    category = "xxxxxx"
 33   }
 34 }

here is my config file is it correct?

andsty avatar Nov 23 '24 01:11 andsty