reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

receiveAutoAck commits the batch even in case of exception (or Error) while processing

Open serejke opened this issue 3 years ago • 5 comments

receiveAutoAck returns a Flux<Flux<Record>> for batches of received records. The internal flux gets terminated (even in case of exception) and commits the batch, because of doOnTerminate callback.

I might be missing something, but I don't see a way to avoid committing the batch if a crash occurs (such as OutOfMemory).

I've prepared a test case

receiveAutoAck is said to be suitable for at-least-once delivery of messages, but in fact the events might be committed before they are processed.

serejke avatar Jan 12 '22 00:01 serejke

Seems pretty serious

rancherz avatar Jan 12 '22 07:01 rancherz

Is there any fix you can think of?

rancherz avatar Jan 12 '22 07:01 rancherz

@rancherz the simplest solution would be replacing of Flux.doAfterTerminate with Flux.doOnComplete to only commit if the flux has completed successfully.

It might be not expected for existing clients, so there may be a configuration parameter added to the ReceiverOptions. Maybe commitBatchOnError: Boolean with default value of true. Documentation/reference and javadocs should be amended with a warning message.

serejke avatar Jan 12 '22 12:01 serejke

I see the issue, but the behavior is consistent with the javadocs and reference manual:

All the records in a batch are acknowledged automatically when its Flux terminates.

So I am not sure there is anything to be done here.

This is similar (not identical) to the Kafka.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG where the Kafka clients commits the offsets in the background (when true) regardless of processing activity.

For more control over commits, you can use the receive() method instead.

If you feel a change is necessary, I would concur that it would need to be optional; contributions are welcome.

garyrussell avatar Jan 12 '22 15:01 garyrussell

@garyrussell to me as a user of this library's API this was such an unclear moment that I filed this issue. Apparently, Javadoc or the reference might be improved in this regard.

I don't have spare time at the moment. I choose to stay with the single manual acknowledge receive function.

What I suggest as a contribution is to add a new API function default Flux<Flux<ConsumerRecord<K, V>>> receiveBatch() {} with a semantic similar to receive where the user has to acknowledge the ConsumerRecord-s after processing, WDYT?

serejke avatar Jan 12 '22 21:01 serejke

but the behavior is consistent with the javadocs and reference manual:

I would say that "can be used for at-least-once delivery of messages" is really misleading. This misunderstanding cost me hours of lost updates hunting. All @serejke solutions look reasonable.

kharole avatar Jul 27 '23 09:07 kharole

I plan to do a PR on this subject during October

awattez avatar Jul 27 '23 14:07 awattez