failsafe
failsafe copied to clipboard
Support sync CompletionStage execution
See https://github.com/jhalterman/failsafe/issues/145#issuecomment-456217684
One of the challenges I am recalling from sync CompletionStage execution is how to implement sync retries without spawning another thread or potentially blowing up a call stack, such as if we implemented sync retries using recursive calls rather than a while loop.
Given the valid comment here mentioning that a Supplier<CompletionStage> never really needs to be async, a few options:
- Implement
getStageusing sync retries with recursion - Make the initial Supplier<CompletionStage>.get() call sync, and let subsequent retries be async
2b. (Optional) Rename
getStageAsynctogetStage, though this may be confusing if retries are still async and CompletionStage is still an async construct - Leave all as is
Not wanting to risk recursion with sync retries, I'm leaning towards options 2 and maybe 2b.
/cc @Tembrel
I don't understand what "sync CompletionStage execution" is.
Also, why is it important to implement sync retries "without spawning another thread"? If you're handed a CompletionStage, there's nothing you can do to control what threads it will use.
I do see that blowing up the stack is bad! But the apparent recursion in the following does not blow up the call stack and doesn't even necessarily result in new threads being spawned, since it uses the common pool. It's synchronous in that it blocks the caller of nextPrime.
public static long nextPrime(long n) throws InterruptedException, ExecutionException {
return nextPrimeAsync(n).get();
}
private static CompletableFuture<Long> nextPrimeAsync(Long n) {
if (BigInteger.valueOf(n).isProbablePrime(10))
return CompletableFuture.completedFuture(n);
else
return CompletableFuture.supplyAsync(() -> n + 1)
.thenCompose(k -> nextPrimeAsync(k));
}
Is that the kind of recursion you were concerned about?
I don't understand what "sync CompletionStage execution" is.
By that I meant one that does not wrap the initial supplier.get in a scheduled thread, and likewise, does not wrap any retries in scheduled threads.
Is that the kind of recursion you were concerned about?
Yea, though I was forgetting that CompletableFuture usually has a supplyAsync which breaks the potential for recursion.
Thanks @Tembrel for the idea - will play with this. I'm wondering if this goes out the window if they have a delay between retries though, as we're left with either a sleep or using a scheduler.
Is there something undesirable about using sleep to implement the delay? It does tie up a thread, but there should only ever be one sleep happening at a time per Failsafe execution, so it's unlikely to be a thread resource problem.
Adapting the toy example above:
public static long nextPrime(long n)
throws InterruptedException, ExecutionException{
return nextPrimeAsync(n).get();
}
private static CompletableFuture<Long> nextPrimeAsync(Long n) {
if (BigInteger.valueOf(n).isProbablePrime(CERTAINTY))
return CompletableFuture.completedFuture(n);
else
return CompletableFuture
.runAsync(() -> sleep(500, TimeUnit.MILLISECONDS))
.thenComposeAsync(__ -> nextPrimeAsync(n + 1));
}
private static void sleep(long time, TimeUnit unit) {
try {
unit.sleep(time);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
Using thenComposeAsync instead of thenCompose separates things into two tasks, one to sleep, and the other to run the stage returned by nextPrimeAsync(n+1).
And here's a way to do it with a scheduler, again adapting the toy example:
private final Scheduler scheduler;
public long nextPrime(long n) throws InterruptedException, ExecutionException {
return nextPrimeAsync(n).get();
}
private CompletableFuture<Long> nextPrimeAsync(Long n) {
if (BigInteger.valueOf(n).isProbablePrime(CERTAINTY))
return CompletableFuture.completedFuture(n);
else {
CompletableFuture<Long> delayed = new CompletableFuture<>();
scheduler.schedule(() -> delayed.complete(n + 1), 500, TimeUnit.MILLISECONDS);
return delayed.thenComposeAsync(this::nextPrimeAsync);
}
}
This avoids blocking any threads other than the caller of nextPrime(n) (because it calls CompletableFuture.get()).