azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
autocommit flag
Dear developers AutoCommitIntervalMs allows to set interval for auto commit. What about a case when handling failed we want to retry, could it be possible to set auto commit to false in some way?
Thank you for the suggestion, Let me investigate it.
@TsuyoshiUshio - The current default value is 5sec as per librdkafka configuration. Does this mean any consumer must be able to process the message (Say batch size is 64) in 5 sec to avoid data loss?
Hello @TsuyoshiUshio
any timeline on this enhancement? We also have a need for functionality where to read the same message again if it failed in any way while processing.
Thanks
Probably, most of the case, we don't need to set auto commit to false. We have Retry policy https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-error-pages?tabs=csharp#retry-policies-preview. Combined with this retry policy, what happens is,
- Kafka Extension read the messages from a broker
- Try Execute your function, if it throws exception, it will replay according to the Retry Policy. However, just repeat 2. Once success or reach the threshold, go on to step 3.
- Execute Store Offset. https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html#store-offsets
- Librdkafka will auto-commit in background only the Store Offset is committed.
So that most of the case, message loss doesn't happen. If the functions host is dead between 3 and 4, I could happen. However, if we make it Auto commit as false and do Synchronous commit strategy, we can reduce the risk, however, it will impact the throughput.
So that IMO, we don't need the Auto commit as false. However, if you guys really need, our extension has extension part of the Commit Strategy, the implementation is not very difficult for Synchronous commit.
@TsuyoshiUshio
Thanks for replying..
If in step 2 , we are unable to process the message because of any reason maybe message is bad or application error, we dont want that message to commit if retry also exhausted, so what should be done to stop commit of that message and further processing?
Thanks
Through the discussion, what you want is, the feature that is not commit if the exception thrown, right?
It is possible, however, currently we are designing the exception handling as the same as EventHubs extension. https://docs.microsoft.com/en-us/azure/azure-functions/functions-reliable-event-processing#how-azure-functions-consumes-event-hubs-events
I need to discuss with @apawast . What do you think?
@TsuyoshiUshio
Yes, this is correct. If there is an exception we dont want to commit and halt the process.
thanks
@TsuyoshiUshio - any update on this?
I am having similar problem. Nothing good comes out from Retry Policy, it's just counting on a successful processing on a 2nd or 3th try. If it doesn't happen, we still have no way to suppress committing offset and we lose a message.
I've investigated a bit and it seems like the only way for now to make use of such feature is to use Confluent.Kafka package directly and setup Consumer manually.
We'd sure appreciate Commit() method access and EnableAutoCommit flag within trigger options.
We'd also benefit greatly from having Commit() method especially since we committed to the latest and greatest for new projects like .net7 and Isolated process
functions and retry policies are not available there.
We use ExponentialBackoffRetry with maxRetryCount=-1 That way we do not loose any messages, since it will retry indefinitely if an exception occurs.
Nevertheless, it would be nice if we could prevent the extension from calling StoreOffset for use-cases where we need to delay the processing of messages further and only want to commit them after processing has finished. We would then call storeOffset manually. What we need would be the equivalent behavior of kafka-dotnet when
EnableAutoCommit = true // (the default)
EnableAutoOffsetStore = false
as described here: https://docs.confluent.io/kafka-clients/dotnet/current/overview.html#store-offsets
@TsuyoshiUshio , you mention that it is possible to extent the behavior of the extension in that regard? Can you point me to where to start?