azure-functions-kafka-extension icon indicating copy to clipboard operation
azure-functions-kafka-extension copied to clipboard

Unable to read messages from kafka

Open sidsingh9 opened this issue 4 years ago • 18 comments

I am not getting the messages from kafka. The partitions are getting assigned as visible from application log as well as kafka queries so there doesnt seem to be any network issue. But messages are not flowing. (Also this thing works when i connect to my local kafka). Is there a way to get more logs or debug further in this

{ "bindings": [
{ "type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "test",
"brokerList": "xxxxxxxxxxxx, "consumerGroup" : "azurefunctions123",
"dataType": "binary", "debug":true },
{ "name": "starter", "type": "durableClient", "direction": "in" } ] }

Application log : [2021-04-20T08:46:06.560Z] Hosting started [2021-04-20T08:46:06.561Z] Startup operation '1862f552-6707-4a3e-9d52-d72fd59207d9' completed.

Functions:

    KafkaConsumer: kafkaTrigger

For detailed output, run func with --verbose flag. [2021-04-20T08:46:07.507Z] Received FunctionLoadResponse for functionId:0efd556c-2c54-4cec-8b94-bac613fdf9ac [2021-04-20T08:46:12.068Z] Host lock lease acquired by instance ID '00000000000000000000000095C955CF'. [2021-04-20T08:46:13.442Z] Assigned partitions: [test [[0]], test [[1]], test [[2]]]

Screenshot 2021-04-20 at 2 18 05 PM

sidsingh9 avatar Apr 20 '21 08:04 sidsingh9

We have a similar problem. Are you by any chance using Confluent Cloud to host the topics?

maxbog avatar Apr 20 '21 08:04 maxbog

We have a similar problem. Are you by any chance using Confluent Cloud to host the topics?

No we use azure servers for our Apache Kafka

sidsingh9 avatar Apr 20 '21 09:04 sidsingh9

Hi @sidsingh9 and @maxbog Thank you for sharing, Which Kafka Extension Version do you use? Also Which Kafka broker do you use? If you add LibkafkaDebug on your host.json it might help to investigate. Refer to the configuration options. The debug is the one.

{
  "version": "2.0",
  "extensions": {
    "kafka": {
      "maxBatchSize": 1,
      "LibkafkaDebug": "broker" 
    }
  },
  "logging": {
    "fileLoggingMode": "debugOnly",
    "logLevel": {
      "default": "Debug"
    }
  }
}

TsuyoshiUshio avatar Apr 20 '21 14:04 TsuyoshiUshio

This is the host.json I was using { "version": "2.0", "logging": { "logLevel": { "Function.MyFunction": "Debug", "default": "Debug" } }, "extensions": { "kafka": { "MaxPollIntervalMs": 900000, "MaxBatchSize": 100, "AutoCommitIntervalMs": 200, "MetadataMaxAgeMs": 180000, "SocketKeepaliveEnable": true, "SubscriberIntervalInSeconds": 1, "ExecutorChannelCapacity": 1, "ChannelFullRetryIntervalInMs": 50, "LibkafkaDebug": "cgrp,topic,fetch,msg,consumer"

  }
}

}

>   <Project Sdk="Microsoft.NET.Sdk">
>   <PropertyGroup>
>     <TargetFramework>netcoreapp3.1</TargetFramework>
> 	<WarningsAsErrors></WarningsAsErrors>
> 	<DefaultItemExcludes>**</DefaultItemExcludes>
>   </PropertyGroup>
>   <ItemGroup>
>     <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.8" />
>     <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.3.1" />
>     <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="2.0.0-beta" />
>     <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.3" />
>     <PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.3" />
>     
>   </ItemGroup>
> </Project>

Will confirm on the kafka version but i think its 2.x . We have tried older 0.9 version of kafka as well and that also doesnt work

sidsingh9 avatar Apr 20 '21 14:04 sidsingh9

Trid with extension version 3 as well

[2021-04-20T15:09:30.060Z] Microsoft.Azure.WebJobs.Host.Config.IExtensionConfigProvider: Implementation: Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaExtensionConfigProvider, Lifetime: Singleton [2021-04-20T15:09:30.060Z] Microsoft.Extensions.Options.IConfigureOptions1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]]: Factory, Lifetime: Singleton [2021-04-20T15:09:30.060Z] Microsoft.Extensions.Options.IConfigureOptions1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]]: Instance: Microsoft.Extensions.Options.ConfigureNamedOptions`1[[Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaOptions, Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null]], Lifetime: Singleton [2021-04-20T15:11:34.464Z] Received FunctionLoadResponse for functionId:40f98a04-d630-4896-8801-452298da6ac3 [2021-04-20T15:11:39.781Z] Host lock lease acquired by instance ID '00000000000000000000000095C955CF'. [2021-04-20T15:11:41.984Z] Assigned partitions: [test [[0]], test [[1]], test [[2]]]

sidsingh9 avatar Apr 20 '21 15:04 sidsingh9

The debug feature is introduced from this release. https://github.com/Azure/azure-functions-kafka-extension/releases/tag/3.2.1 Could you try the latest or 3.2.1 release, then, you can find more logs. Also, I'd like to know which service on Azure Do you use as a Kafka broker?

TsuyoshiUshio avatar Apr 21 '21 02:04 TsuyoshiUshio

Some issue here with the offset. Since this was a new consumer group with no OFFSET stored i also tried running a console consumer with the same groupid to set some offset but still getting the same log



[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [0]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [1]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [2]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: OffsetCommit for 3 partition(s): cgrp auto commit timer: returned: Local: No offset stored
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v225
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=no, v225, line 2242)
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: List with 3 partition(s):
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]:  test [0] offset INVALID
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]:  test [1] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]:  test [2] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetch committed offsets for 3/3 partition(s)
[2021-04-21T04:04:51.635Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [0]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [1]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.838Z] Libkafka: [thrd:main]: Topic test [2]: stored offset -1001, committed offset -1001: not including in commit
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: OffsetCommit for 3 partition(s): cgrp auto commit timer: returned: Local: No offset stored
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v225
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]: Group "azurefunctions123": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=no, v225, line 2242)
å[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]:  test [0] offset INVALID
[2021-04-21T04:04:51.839Z] Libkafka: [thrd:main]:  test [1] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]:  test [2] offset INVALID
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: OffsetFetchRequest(v1) for 3/3 partition(s)
[2021-04-21T04:04:51.840Z] Libkafka: [thrd:main]: GroupCoordinator/60900068: Fetc

h committed offsets for 3/3 partition(s)

sidsingh9 avatar Apr 21 '21 04:04 sidsingh9

logsKafka-3.3.2.txt Latest version log

sidsingh9 avatar Apr 21 '21 06:04 sidsingh9

I have some questions. It looks KafkaTrigger it's self is working and connected to the cluster. However, no offset until now. Did you try to run client (consumer/producer) against the broker and make sure it works? Did you find interesting log on your broker side?

TsuyoshiUshio avatar Apr 21 '21 16:04 TsuyoshiUshio

yeah i tried the console consumer or a java consumer and it works. Will try to get the broker logs and see if there is anything weird there.

sidsingh9 avatar Apr 21 '21 16:04 sidsingh9

Thank you for sharing! I'll investigate more. BTW, Which broker version do you use? The console consumer = C# right?

TsuyoshiUshio avatar Apr 22 '21 05:04 TsuyoshiUshio

Kafka Version : 1.1.1 by console consumer i meant ~/Downloads/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --topic test --bootstrap-server kafka.prod-xxxxxx:9092

sidsingh9 avatar Apr 22 '21 10:04 sidsingh9

Tried a c# program with https://github.com/confluentinc/confluent-kafka-dotnet as well and its working fine

sidsingh9 avatar Apr 23 '21 04:04 sidsingh9

[2021-04-23T05:00:43.244Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.445Z] Libkafka: [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored [2021-04-23T05:00:43.445Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.639Z] Error in Kafka subscriber [2021-04-23T05:00:43.639Z] Confluent.Kafka: Unable to find an entry point named 'rd_kafka_rebalance_protocol' in shared library 'librdkafka'. [2021-04-23T05:00:43.639Z] Exiting ProcessSubscription for test-stride [2021-04-23T05:00:43.646Z] Libkafka: [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored [2021-04-23T05:00:43.646Z] Libkafka: [thrd:main]: Group "azurefunctions123": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) [2021-04-23T05:00:43.776Z] Libkafka: [thrd:main]: JoinGroup response: GenerationId 2, Protocol range, LeaderId rdkafka-751096ed-3d03-4538-beee-4a107fa53b04 (me), my MemberId rdkafka-751096ed-3d03-4538-beee-4a107fa53b04, 1 members in group: (no error)

Getting this error log in consumer. Is this helpfull?

sidsingh9 avatar Apr 23 '21 05:04 sidsingh9

brokerlogs.txt didn't find anything weird in broker logs

sidsingh9 avatar Apr 23 '21 07:04 sidsingh9

@TsuyoshiUshio Anything else that we can check here, we are kind of blocked here with this issue ?

sidsingh9 avatar Apr 26 '21 04:04 sidsingh9

Hi @sidsingh9 Could you add the consumer,cgrp,topic,fetch to the debug in addition to broker? The debug message looks not wrong. The other configurable parameter is configuration see debug section. If we can't find anything, we can post the question on https://github.com/edenhill/librdkafka/issues

TsuyoshiUshio avatar May 03 '21 18:05 TsuyoshiUshio

Hi, Just a FYI. I had similar issues on windows. It only worked once out of 10 or 20 runs. However, when i ran the exact same code on mac it worked without any issues at all.

guojian83 avatar Aug 12 '21 01:08 guojian83

@sidsingh9 is this issue still replicable ?? Can you try with latest kafka extension

shrohilla avatar Dec 14 '22 03:12 shrohilla

No response from user so closing this issue for now

shrohilla avatar Dec 16 '22 07:12 shrohilla