reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

Will this re-create a new kafka consumer every time there is an error

Open dariodariodario opened this issue 6 years ago • 0 comments

Hello, I'm new to reactor... and I wanted to use it to consume from Kafka (which on the other side I know well).

So the thing is I wanted to setup a pipeline that does retry in case of error. I read in the docs that an error in reactor is a terminal operation... even when using retry and backoff the engine will resubscribe to the flux. I use this to consume from a topic, following the thread per partition example:

        Scheduler scheduler = Schedulers.newBoundedElastic(
                processingThreads,
                100,
                "KafkaProcessingPool",
                60);
        this.receiverRecordFlux
                .groupBy(r -> r.partition() % processingThreads)
                .flatMap(groupedFlux -> {
                    var consumer = makeConsumer();
                    return groupedFlux.publishOn(scheduler)

                            .map(r -> {
                                consumer.accept(r);
                                return r.receiverOffset();
                            })
                            .concatMap(ReceiverOffset::commit);
                }).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(25), Duration.ofSeconds(1))
                .doOnError(err -> LOG.warn("Error while consuming, will retry", err)).retry()
                .subscribe();

does it result in the consumer being re-created every time there is an error (that sounds crazy to me)...?

dariodariodario avatar Feb 04 '20 13:02 dariodariodario