Implement application-level retries
The transition from driver 3.x to driver 4.x a while ago in DSBulk 1.5.0 brought one unexpected consequence: client-side timeouts are now global to the whole statement execution. The driver docs say:
Unlike 3.x, the request timeout now spans the entire request. In other words, it's the maximum amount of time that
session.executewill take, including any retry, speculative execution, etc.
What the docs don't say is that because the timeout is global to the session.execute call, timeouts are not retried anymore. The solution to this problem is now to use speculative executions.
Users of DSBulk can use speculative executions if they wish (they are disabled by default). But because speculative executions are hard to tune, and also because they don't work with rate limiters (see #447), I think it would be nice to implement a form of application-level retry when the statement execution fails.
Thanks to the Reactor framework, such a feature could certainly be implemented very easily, using the retryWhen operator, e.g. in LoadWorkflow:
private Flux<WriteResult> executeStatements(Flux<? extends Statement<?>> stmts) {
Retry spec = new Retry() {
private Duration delay;
private long maxRetries = 3;
@Override
public Publisher<Boolean> generateCompanion(Flux<RetrySignal> retrySignals) {
return retrySignals.flatMap(
signal -> {
Mono<Boolean> retryDecision;
if (signal.totalRetries() < maxRetries && signal.failure() instanceof DriverTimeoutException) {
retryDecision = Mono.just(true);
if (delay != null) {
retryDecision = retryDecision.delayElement(delay);
}
} else {
retryDecision = Mono.error(signal.failure());
}
return retryDecision;
});
}
};
return dryRun
? stmts.map(EmptyWriteResult::new)
: stmts.flatMap(statement -> Flux.from(executor.writeReactive(statement)).retryWhen(spec), writeConcurrency);
}
Duplicate of #443, sorry for that.