reactor-kafka
reactor-kafka copied to clipboard
Will this re-create a new kafka consumer every time there is an error
Hello, I'm new to reactor... and I wanted to use it to consume from Kafka (which on the other side I know well).
So the thing is I wanted to setup a pipeline that does retry in case of error. I read in the docs that an error in reactor is a terminal operation... even when using retry and backoff the engine will resubscribe to the flux. I use this to consume from a topic, following the thread per partition example:
Scheduler scheduler = Schedulers.newBoundedElastic(
processingThreads,
100,
"KafkaProcessingPool",
60);
this.receiverRecordFlux
.groupBy(r -> r.partition() % processingThreads)
.flatMap(groupedFlux -> {
var consumer = makeConsumer();
return groupedFlux.publishOn(scheduler)
.map(r -> {
consumer.accept(r);
return r.receiverOffset();
})
.concatMap(ReceiverOffset::commit);
}).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(25), Duration.ofSeconds(1))
.doOnError(err -> LOG.warn("Error while consuming, will retry", err)).retry()
.subscribe();
does it result in the consumer being re-created every time there is an error (that sounds crazy to me)...?