reactor-kafka
reactor-kafka copied to clipboard
receiveAutoAck commits the batch even in case of exception (or Error) while processing
receiveAutoAck returns a Flux<Flux<Record>>
for batches of received records. The internal flux gets terminated (even in case of exception) and commits the batch, because of doOnTerminate callback.
I might be missing something, but I don't see a way to avoid committing the batch if a crash occurs (such as OutOfMemory).
I've prepared a test case
receiveAutoAck
is said to be suitable for at-least-once delivery of messages, but in fact the events might be committed before they are processed.
Seems pretty serious
Is there any fix you can think of?
@rancherz the simplest solution would be replacing of Flux.doAfterTerminate
with Flux.doOnComplete
to only commit if the flux has completed successfully.
It might be not expected for existing clients, so there may be a configuration parameter added to the ReceiverOptions
. Maybe commitBatchOnError: Boolean
with default value of true
. Documentation/reference and javadocs should be amended with a warning message.
I see the issue, but the behavior is consistent with the javadocs and reference manual:
All the records in a batch are acknowledged automatically when its Flux terminates.
So I am not sure there is anything to be done here.
This is similar (not identical) to the Kafka.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
where the Kafka clients commits the offsets in the background (when true) regardless of processing activity.
For more control over commits, you can use the receive()
method instead.
If you feel a change is necessary, I would concur that it would need to be optional; contributions are welcome.
@garyrussell to me as a user of this library's API this was such an unclear moment that I filed this issue. Apparently, Javadoc or the reference might be improved in this regard.
I don't have spare time at the moment. I choose to stay with the single manual acknowledge receive function.
What I suggest as a contribution is to add a new API function
default Flux<Flux<ConsumerRecord<K, V>>> receiveBatch() {}
with a semantic similar to receive
where the user has to acknowledge the ConsumerRecord
-s after processing, WDYT?
but the behavior is consistent with the javadocs and reference manual:
I would say that "can be used for at-least-once delivery of messages" is really misleading. This misunderstanding cost me hours of lost updates hunting. All @serejke solutions look reasonable.
I plan to do a PR on this subject during October