azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
Unable to read messages from kafka
I am not getting the messages from kafka. The partitions are getting assigned as visible from application log as well as kafka queries so there doesnt seem to be any network issue. But messages are not flowing. (Also this thing works when i connect to my local kafka). Is there a way to get more logs or debug further in this
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "test",
"brokerList": "xxxxxxxxxxxx,
"consumerGroup" : "azurefunctions123",
"dataType": "binary",
"debug":true
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
Application log : [2021-04-20T08:46:06.560Z] Hosting started [2021-04-20T08:46:06.561Z] Startup operation '1862f552-6707-4a3e-9d52-d72fd59207d9' completed.
Functions:
KafkaConsumer: kafkaTrigger
For detailed output, run func with --verbose flag. [2021-04-20T08:46:07.507Z] Received FunctionLoadResponse for functionId:0efd556c-2c54-4cec-8b94-bac613fdf9ac [2021-04-20T08:46:12.068Z] Host lock lease acquired by instance ID '00000000000000000000000095C955CF'. [2021-04-20T08:46:13.442Z] Assigned partitions: [test [[0]], test [[1]], test [[2]]]

We have a similar problem. Are you by any chance using Confluent Cloud to host the topics?
We have a similar problem. Are you by any chance using Confluent Cloud to host the topics?
No we use azure servers for our Apache Kafka
Hi @sidsingh9 and @maxbog
Thank you for sharing, Which Kafka Extension Version do you use? Also Which Kafka broker do you use? If you add LibkafkaDebug on your host.json it might help to investigate. Refer to the configuration options. The debug is the one.
{
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 1,
"LibkafkaDebug": "broker"
}
},
"logging": {
"fileLoggingMode": "debugOnly",
"logLevel": {
"default": "Debug"
}
}
}
This is the host.json I was using { "version": "2.0", "logging": { "logLevel": { "Function.MyFunction": "Debug", "default": "Debug" } }, "extensions": { "kafka": { "MaxPollIntervalMs": 900000, "MaxBatchSize": 100, "AutoCommitIntervalMs": 200, "MetadataMaxAgeMs": 180000, "SocketKeepaliveEnable": true, "SubscriberIntervalInSeconds": 1, "ExecutorChannelCapacity": 1, "ChannelFullRetryIntervalInMs": 50, "LibkafkaDebug": "cgrp,topic,fetch,msg,consumer"
}
}
}
> <Project Sdk="Microsoft.NET.Sdk">
> <PropertyGroup>
> <TargetFramework>netcoreapp3.1</TargetFramework>
> <WarningsAsErrors></WarningsAsErrors>
> <DefaultItemExcludes>**</DefaultItemExcludes>
> </PropertyGroup>
> <ItemGroup>
> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.8" />
> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.3.1" />
> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="2.0.0-beta" />
> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.3" />
> <PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.3" />
>
> </ItemGroup>
> </Project>
Will confirm on the kafka version but i think its 2.x . We have tried older 0.9 version of kafka as well and that also doesnt work
Trid with extension version 3 as well
[2021-04-20T15:09:30.060Z] Microsoft.Azure.WebJobs.Host.Config.IExtensionConfigProvider: Implementation: Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaExtensionConfigProvider, Lifetime: Singleton
[2021-04-20T15:09:30.060Z] Microsoft.Extensions.Options.IConfigureOptions1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]]: Factory, Lifetime: Singleton [2021-04-20T15:09:30.060Z] Microsoft.Extensions.Options.IConfigureOptions1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]]: Instance: Microsoft.Extensions.Options.ConfigureNamedOptions`1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]], Lifetime: Singleton
[2021-04-20T15:11:34.464Z] Received FunctionLoadResponse for functionId:40f98a04-d630-4896-8801-452298da6ac3
[2021-04-20T15:11:39.781Z] Host lock lease acquired by instance ID '00000000000000000000000095C955CF'.
[2021-04-20T15:11:41.984Z] Assigned partitions: [test [[0]], test [[1]], test [[2]]]
The debug feature is introduced from this release. https://github.com/Azure/azure-functions-kafka-extension/releases/tag/3.2.1 Could you try the latest or 3.2.1 release, then, you can find more logs. Also, I'd like to know which service on Azure Do you use as a Kafka broker?
Some issue here with the offset. Since this was a new consumer group with no OFFSET stored i also tried running a console consumer with the same groupid to set some offset but still getting the same log
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [0]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [1]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [2]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: OffsetCommit for 3 partition(s): cgrp auto commit timer: returned: Local: No offset stored
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v225
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=no, v225, line 2242)
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: List with 3 partition(s):
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: test [0] offset INVALID
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: test [1] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: test [2] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [0]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [1]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [2]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: OffsetCommit for 3 partition(s): cgrp auto commit timer: returned: Local: No offset stored
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v225
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=no, v225, line 2242)
å[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: test [0] offset INVALID
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: test [1] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: test [2] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetc
h committed offsets for 3/3 partition(s)
logsKafka-3.3.2.txt Latest version log
I have some questions. It looks KafkaTrigger it's self is working and connected to the cluster. However, no offset until now. Did you try to run client (consumer/producer) against the broker and make sure it works? Did you find interesting log on your broker side?
yeah i tried the console consumer or a java consumer and it works. Will try to get the broker logs and see if there is anything weird there.
Thank you for sharing! I'll investigate more. BTW, Which broker version do you use? The console consumer = C# right?
Kafka Version : 1.1.1 by console consumer i meant ~/Downloads/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --topic test --bootstrap-server kafka.prod-xxxxxx:9092
Tried a c# program with https://github.com/confluentinc/confluent-kafka-dotnet as well and its working fine
[2021-04-23T05:00:43.244Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.445Z] Libkafka: [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored [2021-04-23T05:00:43.445Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.639Z] Error in Kafka subscriber [2021-04-23T05:00:43.639Z] Confluent.Kafka: Unable to find an entry point named 'rd_kafka_rebalance_protocol' in shared library 'librdkafka'. [2021-04-23T05:00:43.639Z] Exiting ProcessSubscription for test-stride [2021-04-23T05:00:43.646Z] Libkafka: [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored [2021-04-23T05:00:43.646Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.776Z] Libkafka: [thrd:main]: JoinGroup response: GenerationId 2, Protocol range, LeaderId rdkafka-751096ed-3d03-4538-beee-4a107fa53b04 (me), my MemberId rdkafka-751096ed-3d03-4538-beee-4a107fa53b04, 1 members in group: (no error)
Getting this error log in consumer. Is this helpfull?
brokerlogs.txt didn't find anything weird in broker logs
@TsuyoshiUshio Anything else that we can check here, we are kind of blocked here with this issue ?
Hi @sidsingh9
Could you add the consumer,cgrp,topic,fetch to the debug in addition to broker? The debug message looks not wrong. The other configurable parameter is configuration see debug section.
If we can't find anything, we can post the question on https://github.com/edenhill/librdkafka/issues
Hi, Just a FYI. I had similar issues on windows. It only worked once out of 10 or 20 runs. However, when i ran the exact same code on mac it worked without any issues at all.
@sidsingh9 is this issue still replicable ?? Can you try with latest kafka extension
No response from user so closing this issue for now