micronaut-kafka icon indicating copy to clipboard operation
micronaut-kafka copied to clipboard

Support for setting uncaught exception handler

Open pqab opened this issue 3 years ago • 5 comments

Feature description

There is a new handler StreamsUncaughtExceptionHandler added in Kafka 2.8.0

We would like to use REPLACE_THREAD instead of the default value SHUTDOWN_CLIENT when there is exception throw, but the handler needs to be added before the Kafka Stream started.

It would be nice if we can set a custom handler for it

pqab avatar Oct 14 '21 09:10 pqab

I will second this.

sumant-turlapati avatar May 03 '22 15:05 sumant-turlapati

@graemerocher actually can we listen to the BeforeKafkaStreamStart application event to add custom UncaughtException handlers?

for example:

public class SequencingRunStreamProcessor implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {
   
    @Singleton
    @Named(PROCESSOR_NAME)
    public KStream<String, SequencingRun> sequencingRunSequencedSampleKStream(ConfiguredStreamBuilder builder) {
        // build stream here
    }

    @Override
    public void onApplicationEvent(BeforeKafkaStreamStart event) {
        event.getKafkaStreams().setUncaughtExceptionHandler(this);
    }

    @Override
    public StreamThreadExceptionResponse handle(Throwable exception) {
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    }
}

sumant-turlapati avatar May 03 '22 18:05 sumant-turlapati

not tried it personally, did it work for you?

graemerocher avatar May 03 '22 22:05 graemerocher

Yes this worked for me.

sumant-turlapati avatar May 03 '22 23:05 sumant-turlapati

thanks, I tried the workaround, it worked

pqab avatar Jun 07 '22 22:06 pqab