failsafe icon indicating copy to clipboard operation
failsafe copied to clipboard

Support sync CompletionStage execution

Open jhalterman opened this issue 6 years ago • 4 comments

See https://github.com/jhalterman/failsafe/issues/145#issuecomment-456217684

One of the challenges I am recalling from sync CompletionStage execution is how to implement sync retries without spawning another thread or potentially blowing up a call stack, such as if we implemented sync retries using recursive calls rather than a while loop.

Given the valid comment here mentioning that a Supplier<CompletionStage> never really needs to be async, a few options:

  1. Implement getStage using sync retries with recursion
  2. Make the initial Supplier<CompletionStage>.get() call sync, and let subsequent retries be async 2b. (Optional) Rename getStageAsync to getStage, though this may be confusing if retries are still async and CompletionStage is still an async construct
  3. Leave all as is

Not wanting to risk recursion with sync retries, I'm leaning towards options 2 and maybe 2b.

/cc @Tembrel

jhalterman avatar Jan 22 '19 04:01 jhalterman

I don't understand what "sync CompletionStage execution" is.

Also, why is it important to implement sync retries "without spawning another thread"? If you're handed a CompletionStage, there's nothing you can do to control what threads it will use.

I do see that blowing up the stack is bad! But the apparent recursion in the following does not blow up the call stack and doesn't even necessarily result in new threads being spawned, since it uses the common pool. It's synchronous in that it blocks the caller of nextPrime.

public static long nextPrime(long n) throws InterruptedException, ExecutionException {
    return nextPrimeAsync(n).get();
}

private static CompletableFuture<Long> nextPrimeAsync(Long n) {
    if (BigInteger.valueOf(n).isProbablePrime(10))
	return CompletableFuture.completedFuture(n);
    else
	return CompletableFuture.supplyAsync(() -> n + 1)
	    .thenCompose(k -> nextPrimeAsync(k));
}

Is that the kind of recursion you were concerned about?

Tembrel avatar Jan 23 '19 17:01 Tembrel

I don't understand what "sync CompletionStage execution" is.

By that I meant one that does not wrap the initial supplier.get in a scheduled thread, and likewise, does not wrap any retries in scheduled threads.

Is that the kind of recursion you were concerned about?

Yea, though I was forgetting that CompletableFuture usually has a supplyAsync which breaks the potential for recursion.

Thanks @Tembrel for the idea - will play with this. I'm wondering if this goes out the window if they have a delay between retries though, as we're left with either a sleep or using a scheduler.

jhalterman avatar Jan 24 '19 01:01 jhalterman

Is there something undesirable about using sleep to implement the delay? It does tie up a thread, but there should only ever be one sleep happening at a time per Failsafe execution, so it's unlikely to be a thread resource problem.

Adapting the toy example above:

public static long nextPrime(long n)
	throws InterruptedException, ExecutionException{
    return nextPrimeAsync(n).get();
}

private static CompletableFuture<Long> nextPrimeAsync(Long n) {
    if (BigInteger.valueOf(n).isProbablePrime(CERTAINTY))
	return CompletableFuture.completedFuture(n);
    else
	return CompletableFuture
	    .runAsync(() -> sleep(500, TimeUnit.MILLISECONDS))
	    .thenComposeAsync(__ -> nextPrimeAsync(n + 1));
}

private static void sleep(long time, TimeUnit unit) {
    try {
	unit.sleep(time);
    } catch (InterruptedException ex) {
	Thread.currentThread().interrupt();
    }
}

Using thenComposeAsync instead of thenCompose separates things into two tasks, one to sleep, and the other to run the stage returned by nextPrimeAsync(n+1).

Tembrel avatar Jan 24 '19 03:01 Tembrel

And here's a way to do it with a scheduler, again adapting the toy example:

private final Scheduler scheduler;

public long nextPrime(long n) throws InterruptedException, ExecutionException {
    return nextPrimeAsync(n).get();
}

private CompletableFuture<Long> nextPrimeAsync(Long n) {
    if (BigInteger.valueOf(n).isProbablePrime(CERTAINTY))
	return CompletableFuture.completedFuture(n);
    else {
	CompletableFuture<Long> delayed = new CompletableFuture<>();
	scheduler.schedule(() -> delayed.complete(n + 1), 500, TimeUnit.MILLISECONDS);
	return delayed.thenComposeAsync(this::nextPrimeAsync);
    }
}

This avoids blocking any threads other than the caller of nextPrime(n) (because it calls CompletableFuture.get()).

Tembrel avatar Jan 24 '19 19:01 Tembrel