azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
Need help in connecting to secure cluster topic
I am using azure function kafka extension in my project in javascript. I am creating a consumer that would listen to already existing kafka topic(secure cluster). To access this topic I have a ca_cert.crt certificate. I am getting this error while running the code: ..../bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 342ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
I couldn't find any example or documentation how to add or locate(give path like in sslCaLocation) the certificate in the project. Can I get some help here?
Hi @snehayadavv
This is a python sample, however might similar to your case. https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/python/KafkaTriggerMany/function.json_
For the root certificate configuration, you can refer Java sample. Java write is as annotation, however, it is converted to function.json so that you can add sslCaLocation
to your function.json.
https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/java/confluent/src/main/java/com/contoso/kafka/TriggerFunction.java
If it doesn't help, could you share your function.json? (with removing your secret)
My function.json looks like :
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "payload-events",
"brokerList": "kafka......:9093",
"consumerGroup" : "functions",
"dataType": "binary" ,
"password" : "",
"protocol" : "SSL",
"sslCaLocation": "/Users/sneha/Library/Java/JavaVirtualMachines/corretto-11.0.8/Contents/Home/lib/security/cacerts"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
] }
Now I am getting this error: A host error has occurred during startup operation '5fe4f73a-2bf5-461e-809d-258c46143af0'. Confluent.Kafka: ssl.ca.location failed: error:0B084088:x509 certificate routines:X509_load_cert_crl_file:no certificate or crl found. Value cannot be null. (Parameter 'provider')
Could you specify a specific cacert file to the sslCaLocation? It looks directory. For example,
"sslCaLocation" : "/YOUR_PATH/confluent_cloud_cacert.pem"
Hey, It is not a directory. Imported the certificate in keystore using the following command. sudo keytool -import -trustcacerts -alias certificates -file “ca_cert.crt " -keystore "/Users/sneha/Library/Java/JavaVirtualMachines/corretto-11.0.8/Contents/Home/lib/security/cacerts" -noprompt -storepass changeit And then given this path in function.json Correct me if I am wrong.
You don't need to import via keytool.
You can specify it like following
"sslCaLocation": "/YOUR_PATH/ca_cert.crt"
Could you try it?
After setting the property like this:
"sslCaLocation": "/Users/sneha/Downloads/ca_cert.crt" no error is shown. Still I am not getting anything in console as expected
my index.js looks like this
module.exports = async function (context, event) {
const dec = new string_decode('utf-8');
let event_str = dec.write(event);
context.log.info(`JavaScript Kafka trigger function called for message ${event_str}`);
var value = JSON.parse(`${event_str}`);
context.log(value);
};
Hi @TsuyoshiUshio! I tried with the following json again and like before no error is coming. Looks like it is getting connected to kafka broker but we are not getting any msgs.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "payload-events",
"brokerList": "kafka...9093",
"consumerGroup" : "functions",
"dataType": "binary" ,
"protocol": "ssl",
"sslCaLocation": "/Users/s0s04xy/Downloads/stg.crt",
"autoOffsetReset": "earliest",
"authenticationMode" : "plain"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
] }
- Is there any way to get more detailed logs to debug this.
- Also if we want to set the kakfa offset to the earliest then how to do that.
Thanks!
1. Debug
You can configure debug using host.json
configuration parameter. log.Level
and for more detail debug of the librdkafka library, you can configure LibkafkaDebug
{
"version": "2.0",
"logging": {
"logLevel": {
"Function.MyFunction": "Information",
"default": "Information"
}
},
"extensions": {
"kafka": {
"MaxPollIntervalMs": 900000,
"MaxBatchSize": 100,
"AutoCommitIntervalMs": 200,
"MetadataMaxAgeMs": 180000,
"SocketKeepaliveEnable": true,
"SubscriberIntervalInSeconds": 1,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"LibkafkaDebug": "cgrp,topic,fetch"
}
}
}
- AutoOffsetReset option as Earliest We don't support the option, however, it is Earliest by default.
I hope this helps.
Hi @TsuyoshiUshio
It seems this is not a problem with the certificate.
- We are able to connect to local kafka(localhost) without any issue and getting all the messages
- When we are connecting to any other server in our organisation (in stage/ prod env). We are getting the below logs
- It seems that our function is able to connect to kafka but not able to get the messages . Also is there any other component involved here as we see it is trying to get messages from testhubname-control queue .
- Is there anything we can do to debug the issue further and find out the issue . We have checked with our security team and they have confirmed that the port is open and there is no firewall blocking it.
test-events-> this is the topic name
Assigned partitions: [test-events [[0]], test-events [[1]], test-events [[2]]]
[2021-01-22T06:43:34.605Z] testhubname-control-02: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-02. blobPrefix:
[2021-01-22T06:43:34.616Z] testhubname-control-02: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-02. blobPrefix:
[2021-01-22T06:43:34.624Z] testhubname-control-01: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-01. blobPrefix:
[2021-01-22T06:43:34.626Z] testhubname-control-01: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-01. blobPrefix:
[2021-01-22T06:43:34.631Z] testhubname-control-00: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-00. blobPrefix:
[2021-01-22T06:43:34.634Z] testhubname-control-00: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-00. blobPrefix:
[2021-01-22T06:43:34.640Z] testhubname-control-03: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: intent, partitionId: testhubname-control-03. blobPrefix:
[2021-01-22T06:43:34.643Z] testhubname-control-03: CreateLeaseIfNotExistAsync - leaseContainerName: testhubname-leases, leaseType: ownership, partitionId: testhubname-control-03. blobPrefix:
[2021-01-22T06:43:41.197Z] Durable task hub worker is starting
[2021-01-22T06:43:41.219Z] testhubname-applease: Attempting to acquire lease
[2021-01-22T06:43:41.237Z] Durable task hub worker started successfully after 38ms
------------------
[2021-01-22T06:43:56.783Z] testhubname-control-00: Started listening for messages on queue testhubname-control-00.
[2021-01-22T06:43:57.202Z] testhubname-control-00: No new messages were found - backing off
[2021-01-22T06:43:58.093Z] testhubname-control-02: Started listening for messages on queue testhubname-control-02.
[2021-01-22T06:43:58.114Z] testhubname-control-03: Started listening for messages on queue testhubname-control-03.
[2021-01-22T06:43:58.196Z] testhubname-control-01: Started listening for messages on queue testhubname-control-01.
[2021-01-22T06:43:58.511Z] testhubname-control-02: No new messages were found - backing off
[2021-01-22T06:43:58.525Z] testhubname-control-03: No new messages were found - backing off
[2021-01-22T06:43:58.602Z] testhubname-control-01: No new messages were found - backing off
I come up with two ideas.
- Could you separate the durable code? testhubname-control-00 ... is durable functions message. only
Assigned partitions: [test-events [[0]], test-events [[1]], test-events [[2]]]
message is kafka one. It is for narrowing it down. - Could you set LibkafkaDebug configuration? then you will see more messages.
- Could you see the log of the broker side?
- Do we need password? Could you check if the broker has authentication mechanism. like sasl. For example, If I connect to the EventHubs or ConfluentCloud with KafkaAPI, it will be
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"brokerList" : "BrokerList",
"topic" : "topic",
"username" : "$ConnectionString",
"password" : "%Password%",
"protocol" : "SASLSSL",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"dataType": "binary"
}
I also encounter the same issue as you. It was missing protocol
or authenticationMode
or username
or password
was missing in my case. You can debug more with func start --verbose
if you run the functions locally on your mahcine.
I create a sample for targeting EventHubs, that requires username, password, sasl . It might help. https://github.com/TsuyoshiUshio/KafkaTemplateSamples/tree/eventHubs
Since no communication on this thread so closing this for now. Feel free to reopen the issue if it reoccurs