azure-functions-kafka-extension icon indicating copy to clipboard operation
azure-functions-kafka-extension copied to clipboard

Need help in connecting to secure cluster topic

Open snehayadavv opened this issue 4 years ago • 11 comments

I am using azure function kafka extension in my project in javascript. I am creating a consumer that would listen to already existing kafka topic(secure cluster). To access this topic I have a ca_cert.crt certificate. I am getting this error while running the code: ..../bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 342ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)

I couldn't find any example or documentation how to add or locate(give path like in sslCaLocation) the certificate in the project. Can I get some help here?

snehayadavv avatar Jan 05 '21 10:01 snehayadavv

Hi @snehayadavv

This is a python sample, however might similar to your case. https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/python/KafkaTriggerMany/function.json_

For the root certificate configuration, you can refer Java sample. Java write is as annotation, however, it is converted to function.json so that you can add sslCaLocation to your function.json.

https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/java/confluent/src/main/java/com/contoso/kafka/TriggerFunction.java

If it doesn't help, could you share your function.json? (with removing your secret)

TsuyoshiUshio avatar Jan 05 '21 22:01 TsuyoshiUshio

My function.json looks like : { "bindings": [
{ "type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "payload-events",
"brokerList": "kafka......:9093",
"consumerGroup" : "functions",
"dataType": "binary" , "password" : "", "protocol" : "SSL", "sslCaLocation": "/Users/sneha/Library/Java/JavaVirtualMachines/corretto-11.0.8/Contents/Home/lib/security/cacerts"

},      
{
  "name": "starter",
  "type": "durableClient",
  "direction": "in"
}

] }

Now I am getting this error: A host error has occurred during startup operation '5fe4f73a-2bf5-461e-809d-258c46143af0'. Confluent.Kafka: ssl.ca.location failed: error:0B084088:x509 certificate routines:X509_load_cert_crl_file:no certificate or crl found. Value cannot be null. (Parameter 'provider')

snehayadavv avatar Jan 06 '21 14:01 snehayadavv

Could you specify a specific cacert file to the sslCaLocation? It looks directory. For example,

"sslCaLocation" : "/YOUR_PATH/confluent_cloud_cacert.pem"

TsuyoshiUshio avatar Jan 06 '21 17:01 TsuyoshiUshio

Hey, It is not a directory. Imported the certificate in keystore using the following command. sudo keytool -import -trustcacerts -alias certificates -file “ca_cert.crt " -keystore "/Users/sneha/Library/Java/JavaVirtualMachines/corretto-11.0.8/Contents/Home/lib/security/cacerts" -noprompt -storepass changeit And then given this path in function.json Correct me if I am wrong.

snehayadavv avatar Jan 06 '21 18:01 snehayadavv

You don't need to import via keytool.

You can specify it like following

"sslCaLocation": "/YOUR_PATH/ca_cert.crt"

Could you try it?

TsuyoshiUshio avatar Jan 06 '21 18:01 TsuyoshiUshio

After setting the property like this:

"sslCaLocation": "/Users/sneha/Downloads/ca_cert.crt" no error is shown. Still I am not getting anything in console as expected

my index.js looks like this

module.exports = async function (context, event) {

const dec = new string_decode('utf-8');

let event_str = dec.write(event);

context.log.info(`JavaScript Kafka trigger function called for message ${event_str}`);

var value = JSON.parse(`${event_str}`);

context.log(value);

};

snehayadavv avatar Jan 06 '21 18:01 snehayadavv

Hi @TsuyoshiUshio! I tried with the following json again and like before no error is coming. Looks like it is getting connected to kafka broker but we are not getting any msgs.

{ "bindings": [
{

        "type": "kafkaTrigger",  
        "name": "event",        
        "direction": "in",          
        "topic": "payload-events",          
        "brokerList": "kafka...9093",
        "consumerGroup" : "functions",          
        "dataType": "binary" ,
        "protocol": "ssl",
        "sslCaLocation": "/Users/s0s04xy/Downloads/stg.crt",
        "autoOffsetReset": "earliest",
        "authenticationMode" : "plain"

},      
{
  "name": "starter",
  "type": "durableClient",
  "direction": "in"
}

] }

  1. Is there any way to get more detailed logs to debug this.
  2. Also if we want to set the kakfa offset to the earliest then how to do that.

Thanks!

snehayadavv avatar Jan 08 '21 16:01 snehayadavv

1. Debug

You can configure debug using host.json configuration parameter. log.Level and for more detail debug of the librdkafka library, you can configure LibkafkaDebug

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "Function.MyFunction": "Information",
      "default": "Information"
    }
  },
    "extensions": {
      "kafka": {
        "MaxPollIntervalMs": 900000,
        "MaxBatchSize": 100,
        "AutoCommitIntervalMs": 200,
        "MetadataMaxAgeMs": 180000,
        "SocketKeepaliveEnable": true,
        "SubscriberIntervalInSeconds": 1,
        "ExecutorChannelCapacity": 1,
        "ChannelFullRetryIntervalInMs": 50,
        "LibkafkaDebug": "cgrp,topic,fetch"
      }
    }
  }
  1. AutoOffsetReset option as Earliest We don't support the option, however, it is Earliest by default.

I hope this helps.

TsuyoshiUshio avatar Jan 09 '21 23:01 TsuyoshiUshio

Hi @TsuyoshiUshio

It seems this is not a problem with the certificate.

  • We are able to connect to local kafka(localhost) without any issue and getting all the messages
  • When we are connecting to any other server in our organisation (in stage/ prod env). We are getting the below logs
  • It seems that our function is able to connect to kafka but not able to get the messages . Also is there any other component involved here as we see it is trying to get messages from testhubname-control queue .
  • Is there anything we can do to debug the issue further and find out the issue . We have checked with our security team and they have confirmed that the port is open and there is no firewall blocking it.

test-events-> this is the topic name


Assigned partitions: [test-events [[0]], test-events [[1]], test-events [[2]]]
[2021-01-22T06:43:34.605Z] testhubname-control-02: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-02. blobPrefix: 
[2021-01-22T06:43:34.616Z] testhubname-control-02: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-02. blobPrefix: 
[2021-01-22T06:43:34.624Z] testhubname-control-01: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-01. blobPrefix: 
[2021-01-22T06:43:34.626Z] testhubname-control-01: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-01. blobPrefix: 
[2021-01-22T06:43:34.631Z] testhubname-control-00: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-00. blobPrefix: 
[2021-01-22T06:43:34.634Z] testhubname-control-00: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-00. blobPrefix: 
[2021-01-22T06:43:34.640Z] testhubname-control-03: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-03. blobPrefix: 
[2021-01-22T06:43:34.643Z] testhubname-control-03: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-03. blobPrefix: 
[2021-01-22T06:43:41.197Z] Durable task hub worker is starting
[2021-01-22T06:43:41.219Z] testhubname-applease: Attempting to acquire lease
[2021-01-22T06:43:41.237Z] Durable task hub worker started successfully after 38ms


------------------

[2021-01-22T06:43:56.783Z] testhubname-control-00: Started listening for messages on queue testhubname-control-00.
[2021-01-22T06:43:57.202Z] testhubname-control-00: No new messages were found - backing off
[2021-01-22T06:43:58.093Z] testhubname-control-02: Started listening for messages on queue testhubname-control-02.
[2021-01-22T06:43:58.114Z] testhubname-control-03: Started listening for messages on queue testhubname-control-03.
[2021-01-22T06:43:58.196Z] testhubname-control-01: Started listening for messages on queue testhubname-control-01.
[2021-01-22T06:43:58.511Z] testhubname-control-02: No new messages were found - backing off
[2021-01-22T06:43:58.525Z] testhubname-control-03: No new messages were found - backing off
[2021-01-22T06:43:58.602Z] testhubname-control-01: No new messages were found - backing off


sidsingh9 avatar Jan 22 '21 07:01 sidsingh9

I come up with two ideas.

  1. Could you separate the durable code? testhubname-control-00 ... is durable functions message. only Assigned partitions: [test-events [[0]], test-events [[1]], test-events [[2]]] message is kafka one. It is for narrowing it down.
  2. Could you set LibkafkaDebug configuration? then you will see more messages.
  3. Could you see the log of the broker side?
  4. Do we need password? Could you check if the broker has authentication mechanism. like sasl. For example, If I connect to the EventHubs or ConfluentCloud with KafkaAPI, it will be
  "bindings": [
    {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "brokerList" : "BrokerList",
            "topic" : "topic",
            "username" : "$ConnectionString",
            "password" : "%Password%",
            "protocol" : "SASLSSL",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "dataType": "binary"
    }

I also encounter the same issue as you. It was missing protocol or authenticationMode or username or password was missing in my case. You can debug more with func start --verbose if you run the functions locally on your mahcine.

TsuyoshiUshio avatar Jan 22 '21 19:01 TsuyoshiUshio

I create a sample for targeting EventHubs, that requires username, password, sasl . It might help. https://github.com/TsuyoshiUshio/KafkaTemplateSamples/tree/eventHubs

TsuyoshiUshio avatar Jan 23 '21 00:01 TsuyoshiUshio

Since no communication on this thread so closing this for now. Feel free to reopen the issue if it reoccurs

shrohilla avatar Dec 14 '22 03:12 shrohilla