spring-kafka
spring-kafka copied to clipboard
`onMessage` methods in *MessageListener should allow for checked exception throwing
One can use the @KafkaListener annotation in methods that throw checked exceptions.
However, when implementing the *MessageListener interfaces, none of the onMessage methods allow for (checked) Exceptions to be thrown.
Changing the signature to include throws Exception would prevent catching exceptions just to wrap them in RuntimeException.
That would be a big breaking change; something we could not consider until the next major release (3.0).
I would like to mention that checked exceptions are possible to be thrown when having listener written in Kotlin (I believe it could be done in Java if one were to use lombok's @SneakyThrows).
Current behaviour is that when such exception is thrown, it is not caught inside doInvokeBatchListener()/doInvokeRecordListener() and consequently, records are not passed to errorHandler. Instead, what happens is that this Exception is caught up in call stack in KafkaMessageListenerContainer.run() and passed to handleConsumerException() and handled as if it was thrown by KafkaConsumer, not the processing listener.
This is problematic because now, errorHadler/batchErrorHandler does not get records, and if one uses SeekToCurrentBatchErrorHandler then there is no seeking happening and consumer continues and without replaying messages that failed leading to data loss.
@antonio-tomac I was not aware of that, I must say that I am appalled that Kotlin violates a fundamental assumption about bytecode. As for @SneakyThrows; I don't believe that deserves a comment by me.
However, I am equally appalled by a comment in the @SneakyThrows javadoc:
The JVM does not check for the consistency of the checked exception system; javac does, and this annotation lets you opt out of its mechanism.
This is only a problem if you implement MessageListener (or one of its variants) directly.
For example, for POJO listeners (@KafkaListener), the framework wraps any exception thrown in a ListenerExecutionFailedException before it is thrown to the container.
I will (reluctantly) make the change, but in the meantime, you should do the same (catch and wrap in a LEFE in your listener).
@garyrussell , are we good closing this issue since your PR has been merged?
No; I made a note in the PR; this has to remain open.
@garyrussell Nice, thanks for quick reaction.
you should do the same (catch and wrap in a LEFE in your listener).
Yeah, I already do that as workaround, main issue is need to remember to do it in new projects when using custom listener implementations.
The main reason why I speak up about this is that I already encountered this (like ~a year ago) and I've spent quite time figuring out what is going on. Recently, it happened again on different project and, again, wasted time figuring it out, since I forgot about it from year ago. Luckily, second time it was caught in integration tests.
Now I'm looking forward to have one thing less to worry in the future. Thanks again
Happy to help.
I already encountered this (like ~a year ago) and I've spent quite time figuring out what is going on.
It's unfortunate that you didn't raise the issue back then, but I am glad it is resolved now.