azure-functions-kafka-extension icon indicating copy to clipboard operation
azure-functions-kafka-extension copied to clipboard

autocommit flag

Open borys86 opened this issue 4 years ago • 11 comments

Dear developers AutoCommitIntervalMs allows to set interval for auto commit. What about a case when handling failed we want to retry, could it be possible to set auto commit to false in some way?

borys86 avatar Oct 14 '20 12:10 borys86

Thank you for the suggestion, Let me investigate it.

TsuyoshiUshio avatar Oct 15 '20 19:10 TsuyoshiUshio

@TsuyoshiUshio - The current default value is 5sec as per librdkafka configuration. Does this mean any consumer must be able to process the message (Say batch size is 64) in 5 sec to avoid data loss?

rkbalu avatar Mar 16 '21 23:03 rkbalu

Hello @TsuyoshiUshio

any timeline on this enhancement? We also have a need for functionality where to read the same message again if it failed in any way while processing.

Thanks

tech7857 avatar Jul 09 '21 16:07 tech7857

Probably, most of the case, we don't need to set auto commit to false. We have Retry policy https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-error-pages?tabs=csharp#retry-policies-preview. Combined with this retry policy, what happens is,

  1. Kafka Extension read the messages from a broker
  2. Try Execute your function, if it throws exception, it will replay according to the Retry Policy. However, just repeat 2. Once success or reach the threshold, go on to step 3.
  3. Execute Store Offset. https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html#store-offsets
  4. Librdkafka will auto-commit in background only the Store Offset is committed.

So that most of the case, message loss doesn't happen. If the functions host is dead between 3 and 4, I could happen. However, if we make it Auto commit as false and do Synchronous commit strategy, we can reduce the risk, however, it will impact the throughput.

So that IMO, we don't need the Auto commit as false. However, if you guys really need, our extension has extension part of the Commit Strategy, the implementation is not very difficult for Synchronous commit.

TsuyoshiUshio avatar Jul 10 '21 23:07 TsuyoshiUshio

@TsuyoshiUshio

Thanks for replying..

If in step 2 , we are unable to process the message because of any reason maybe message is bad or application error, we dont want that message to commit if retry also exhausted, so what should be done to stop commit of that message and further processing?

Thanks

tech7857 avatar Jul 11 '21 00:07 tech7857

Through the discussion, what you want is, the feature that is not commit if the exception thrown, right?

It is possible, however, currently we are designing the exception handling as the same as EventHubs extension. https://docs.microsoft.com/en-us/azure/azure-functions/functions-reliable-event-processing#how-azure-functions-consumes-event-hubs-events

I need to discuss with @apawast . What do you think?

TsuyoshiUshio avatar Jul 11 '21 04:07 TsuyoshiUshio

@TsuyoshiUshio

Yes, this is correct. If there is an exception we dont want to commit and halt the process.

thanks

tech7857 avatar Jul 11 '21 11:07 tech7857

@TsuyoshiUshio - any update on this?

tech7857 avatar Aug 05 '21 21:08 tech7857

I am having similar problem. Nothing good comes out from Retry Policy, it's just counting on a successful processing on a 2nd or 3th try. If it doesn't happen, we still have no way to suppress committing offset and we lose a message.

I've investigated a bit and it seems like the only way for now to make use of such feature is to use Confluent.Kafka package directly and setup Consumer manually.

We'd sure appreciate Commit() method access and EnableAutoCommit flag within trigger options.

MichalLechowski avatar Dec 07 '21 13:12 MichalLechowski

We'd also benefit greatly from having Commit() method especially since we committed to the latest and greatest for new projects like .net7 and Isolated process functions and retry policies are not available there.

the-rule avatar Mar 24 '23 09:03 the-rule

We use ExponentialBackoffRetry with maxRetryCount=-1 That way we do not loose any messages, since it will retry indefinitely if an exception occurs.

Nevertheless, it would be nice if we could prevent the extension from calling StoreOffset for use-cases where we need to delay the processing of messages further and only want to commit them after processing has finished. We would then call storeOffset manually. What we need would be the equivalent behavior of kafka-dotnet when

    EnableAutoCommit = true // (the default)
    EnableAutoOffsetStore = false

as described here: https://docs.confluent.io/kafka-clients/dotnet/current/overview.html#store-offsets

@TsuyoshiUshio , you mention that it is possible to extent the behavior of the extension in that regard? Can you point me to where to start?

superkartoffel avatar Jul 04 '23 06:07 superkartoffel