azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
Offset commit errors causing topics to restart from earliest offset
Occasionally we see an issue in our function app logs with this error:
Libkafka: [thrd:main]: Offset commit (cgrp auto commit timer) failed for 1/1 partition(s) in join-state steady: Local: Broker transport failure: my-topic[2]@221083602(Local: Broker transport failure)
Obviously there was some error committing an offset, but the problem is that when we restart the function app after this, we start getting events from the earliest offset for the topic. This has bit us since it can be tens of millions of old events that take a long time to catch up. If the extension is auto-committing after every execution (that's my understanding), why would it not start from the last known good offset commit? Is there something the extension can/should be doing to better recover from this or at least prevent the consumer from resetting the offset?
As an aside: we attempted to speed up the consumption of old events my manually scaling out our function app. I had assumed this would start more consumers for each instance, but all the newly scaled function app instances remained idle and a single instance was maxed out. Is this expected? Are all function app instances sharing a single kafka client instance under the hood?
Function App Plan: Premium Platform: Windows / NodeJS
Function.json Sample:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "my-topic",
"brokerList": "%KafkaBrokers%",
"consumerGroup": "%KafkaConsumerGroup%",
"authenticationMode": "PLAIN",
"protocol": "SASLSSL",
"username": "%KafkaUsername%",
"password": "%KafkaPassword%",
"dataType": "binary"
}
],
"scriptFile": "../dist/myFunction/index.js"
}
Host.json settings:
...
"kafka": {
"maxBatchSize": 50
},
...
Hi @bm77525-kr The max scaling will be the same as the number of the partitions. If you want to increase the capacity, increase the number of the partition.
You can also set cardinality as Many if you want to use Batch mode. It might helps to increase the throughput.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "message",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
Currently, we use AutoOffsetReset.Earliest. Hi @fbeltrao Do you know the context that we don't make it configurable?
https://github.com/Azure/azure-functions-kafka-extension/pull/216
@TsuyoshiUshio -do you know when will this be released on Nuget?
We are doing a POC for the client and are in process of evaluating the options to consume Kafka stream. Azure Functions Kafka trigger is very promising but any new function will register itself as new consumer and hence we were seeing lot of historical data. If we can get AutoOffsetReset exposed as a property, we can set it to Latest to get around the issue. Also, is there a general guidance around handling duplicates in such scenario?
This issue is handled at -- https://github.com/Azure/azure-functions-kafka-extension/pull/364 and will be released soon Thanks alot for raising the issue @bm77525-kr/ @tech7857
This is released now in 3.5.0