micronaut-kafka
micronaut-kafka copied to clipboard
Support for setting uncaught exception handler
Feature description
There is a new handler StreamsUncaughtExceptionHandler
added in Kafka 2.8.0
We would like to use REPLACE_THREAD
instead of the default value SHUTDOWN_CLIENT
when there is exception throw, but the handler needs to be added before the Kafka Stream started.
It would be nice if we can set a custom handler for it
I will second this.
@graemerocher actually can we listen to the BeforeKafkaStreamStart application event to add custom UncaughtException handlers?
for example:
public class SequencingRunStreamProcessor implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {
@Singleton
@Named(PROCESSOR_NAME)
public KStream<String, SequencingRun> sequencingRunSequencedSampleKStream(ConfiguredStreamBuilder builder) {
// build stream here
}
@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
not tried it personally, did it work for you?
Yes this worked for me.
thanks, I tried the workaround, it worked