reactor-core
reactor-core copied to clipboard
Flux of Mono ResponseSpec (webclient .retrieve) .OnErrorResume unusefull to substitute with a correct Mono when return Mono.error()
@Override
public Flux<?> getRatesFlux(String logHash, String apiUrl) {
return ratesReactiveRepository
.findAll()
.flatMap(rate -> {
log.info("{} rate read from MongoDb: {}", logHash, rate);
String url = apiUrl + "/" + rate.getRateName();
log.debug("{}, START {} URI: {}",
logHash,
"controller.ratesServiceImpl.getRatesFlux",
url);
return webClient
.post()
.uri(url)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus( HttpStatusCode::is5xxServerError,
clientResponse -> Mono.error("INTERNAL SERVER ERROR for rate " + rate))
.onStatus( HttpStatusCode::is4xxClientError,
clientResponse -> Mono.error("EMPTY for rate " + rate))
.bodyToMono(ContentFragmentMainEntity.class);
});
}
Below service is called from controller:
return aemRatesService
.getRatesFlux(logHash, apiUrl)
.onErrorContinue((err, i) -> log.error("ERROR LOAD SOME RATE VERIFY LOG"))
.flatMap(response -> {
log.info("OK LOAD " + response.getCodice());
return Mono.just("SUCCESS LOAD RATE " + response.getCodice());
})
.collectList();
I tried to use .OnErrorResume but block flow return a single value not continue and in this implementation not continue stop after a 505 INTERNAL SERVER ERROR.
How can manage in a Flux a Mono.error to return a Mono.just object and continue in asyncronous flatmap?
Thanks a lot for your support.