azure-functions-kafka-extension icon indicating copy to clipboard operation
azure-functions-kafka-extension copied to clipboard

Offset commit errors causing topics to restart from earliest offset

Open bm77525-kr opened this issue 4 years ago • 5 comments

Occasionally we see an issue in our function app logs with this error:

Libkafka: [thrd:main]: Offset commit (cgrp auto commit timer) failed for 1/1 partition(s) in join-state steady: Local: Broker transport failure: my-topic[2]@221083602(Local: Broker transport failure)

Obviously there was some error committing an offset, but the problem is that when we restart the function app after this, we start getting events from the earliest offset for the topic. This has bit us since it can be tens of millions of old events that take a long time to catch up. If the extension is auto-committing after every execution (that's my understanding), why would it not start from the last known good offset commit? Is there something the extension can/should be doing to better recover from this or at least prevent the consumer from resetting the offset?

As an aside: we attempted to speed up the consumption of old events my manually scaling out our function app. I had assumed this would start more consumers for each instance, but all the newly scaled function app instances remained idle and a single instance was maxed out. Is this expected? Are all function app instances sharing a single kafka client instance under the hood?

Function App Plan: Premium Platform: Windows / NodeJS

Function.json Sample:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "my-topic",
            "brokerList": "%KafkaBrokers%",
            "consumerGroup": "%KafkaConsumerGroup%",
            "authenticationMode": "PLAIN",
            "protocol": "SASLSSL",
            "username": "%KafkaUsername%",
            "password": "%KafkaPassword%",
            "dataType": "binary"
        }
    ],
    "scriptFile": "../dist/myFunction/index.js"
}

Host.json settings:

...
"kafka": {
    "maxBatchSize": 50
},
...

bm77525-kr avatar Jul 15 '21 20:07 bm77525-kr

Hi @bm77525-kr The max scaling will be the same as the number of the partitions. If you want to increase the capacity, increase the number of the partition.

You can also set cardinality as Many if you want to use Batch mode. It might helps to increase the throughput.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "message",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Currently, we use AutoOffsetReset.Earliest. Hi @fbeltrao Do you know the context that we don't make it configurable?

TsuyoshiUshio avatar Jul 27 '21 17:07 TsuyoshiUshio

https://github.com/Azure/azure-functions-kafka-extension/pull/216

TsuyoshiUshio avatar Jul 27 '21 18:07 TsuyoshiUshio

@TsuyoshiUshio -do you know when will this be released on Nuget?

tech7857 avatar Aug 05 '21 21:08 tech7857

We are doing a POC for the client and are in process of evaluating the options to consume Kafka stream. Azure Functions Kafka trigger is very promising but any new function will register itself as new consumer and hence we were seeing lot of historical data. If we can get AutoOffsetReset exposed as a property, we can set it to Latest to get around the issue. Also, is there a general guidance around handling duplicates in such scenario?

rohitsgupta avatar Mar 21 '22 17:03 rohitsgupta

This issue is handled at -- https://github.com/Azure/azure-functions-kafka-extension/pull/364 and will be released soon Thanks alot for raising the issue @bm77525-kr/ @tech7857

shrohilla avatar Jul 14 '22 07:07 shrohilla

This is released now in 3.5.0

shrohilla avatar Oct 20 '22 15:10 shrohilla