kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Provide an API to race coroutines ("losers" are cancelled)
Hello,
I keep seeing this suspend functions racing use case being asked on Kotlin's Slack, and I personally need it quite often as it suits many needs in "coroutines-heavy" code.
I've been using my own solution for a long time now, which I also published as a library in Splitties for that (available on MavenCentral), and I didn't find any better API than raceOf({…}, { … }, …) and race { launchRacer { … } }.
Here is an example snippet of raceOf in action taken straight from the README of Splitties Coroutines:
suspend fun testCoroutinesRacing() {
val result = raceOf({
delay(3)
"slow"
}, {
delay(0)
"fast"
})
assertEquals(expected = "fast", actual = result)
}
and here's a permalink to the code of these 2 APIs.
Can this API or a better alternative be considered for addition inside kotlinx.coroutines?
Hi,
Could you please show few examples of raceOf from a real-world project? I remember discussing at some point, but AFAIR we haven't found any compelling use-cases for that
In the main project on my day job, I have close to 100 usages of that function FYI.
I'm going to bring a curated summary of the most interesting use cases a bit later.
A race similar API has already been discussed in #424
@qwwdfsad are you looking for this? https://github.com/Kotlin/kotlinx.coroutines/issues/424#issuecomment-403068240
First simple and compelling use case is for timeouts:
With raceOf, you can replace withTimeoutOrNull without introducing ambiguity as to why the result of the enclosing expression is null, and you can replace withTimeout without having to surround with try/catch block:
suspend fun whatever(
timeout: Duration = Duration.seconds(6)
): SomeResult = raceOf({
SomeResult.Yay(value = doStuffAndGetSomethingQuiteQuickly())
}, {
delay(timeout)
SomeResult.TimedOut(after = timeout)
})
Then it works well to surface cancellability in a way that keeps everything sequential:
suspend fun tryDoingStuff(awaitCancelRequest: suspend () -> Unit) = raceOf({
doStuff() // When complete, awaitCancelRequest() will be cancelled.
}, {
awaitCancelRequest() // doStuff() would be cancelled automatically.
})
It also works well when you have multiple ways to provide some input data:
suspend fun getToken(ui: RequiresLoginUi) = raceOf({
getTokenFromStorage()?.takeUnless { it.isStale() } ?: awaitCancellation()
}, login@{
repeatWhileActive {
val credentials = raceOf({
getSavedCredentialsFromAutoFill()
}, {
ui.awaitCredentialsSubmitting()
})
when (val loginResult = attemptLogin(credentials)) {
is Attempt.Success -> return@login loginResult.token
is Attempt.Failure -> ui.awaitLoginFailureAcknowledgement(loginResult)
}
}
})
suspend inline fun repeatWhileActive(block: () -> Unit): Nothing {
while (true) {
coroutineContext.ensureActive()
block()
}
}
It works well when you want to enforce a certain UX process where there is ambiguity, regardless of how the UI is implemented:
suspend awaitTrigger() = raceOf({
awaitAutomaticTrigger()
}, {
awaitManualTrigger()
})
suspend awaitDoorLockRequest(
voiceTrigger: VoiceTrigger,
awaitLockButtonClick: suspend () -> Unit
) = raceOf({
voiceTrigger.awaitHotPhrase()
}, {
awaitLockButtonClick()
})
suspend fun runSomeOnboardingStep(ui: SomeUi) = raceOf({
ui.awaitExistingUserConfirmation()
}, {
ui.awaitRequestToWatchQuickIntro()
raceOf({
ui.showQuickIntroVideoUntilComplete()
}, {
ui.awaitSkipRequest()
})
})
The following are just some thoughts on the matter.
In general, this can be thought of as non-atomic selects. In fact, I see that this library implements this via selects in a non-atomic manner: https://github.com/LouisCAD/Splitties/blob/7539eaa9fa59fa92720c7ba5abd559ae4fc9172e/modules/coroutines/src/commonMain/kotlin/splitties/coroutines/Racing.kt
So, select vs raceOf (or nonAtomicSelect):
selectis atomic, so, by design, if one branch finishes successfully, the others shouldn't have an effect. WithnonAtomicSelect, one has to consider the implications of several operations successfully finishing.nonAtomicSelectworks for everything that's cancellable. Specific operations don't require any special treatment.- As a consequence,
nonAtomicSelectis somewhat easier to use: just run the operations and don't guess whatonZZZto use. selectis somewhat prettier, with its builder syntax.nonAtomicSelectdoesn't separate the async operation and the post-processing of the result. This may simplify using it, but at the cost of the concurrent operations working for longer than they should if the post-processing is long.
The post-processing is only cancelling "losers" here, unless we want to consider an API that allows a given set of winners, which would be interesting, albeit beyond the scope of this specific issue that already has clear use cases IMHO.
Or would the risk you're mentioning be able to cause cross-cancellation, where no one wins because there's an ex-aequo? I think such a behavior would not work well for application use cases, best to pick first, or be random for this rare case.
By post-processing I mean the part after the asynchronous part of the operation is complete. In select, it's the things that are in the code blocks:
select<Int> {
channel1.onReceive { v ->
Thread.sleep(1000) // post-processing
v
}
channel2.onReceive { v ->
Thread.sleep(1000) // post-processing
v
}
}
In select, after the asynchronous operation is itself completed, all the other operations stop. In your implementation, the various branches only cancel the other operations when the value is ready to be returned.
A middle ground between raceOf and select is something like what Rust does: https://docs.rs/futures/0.3.4/futures/macro.select.html It has an interface similar to our select, but it's implemented similarly to raceOf.
Note, that you can already use select to race coroutines, but it takes a bit of additional boilerplate:
suspend fun main() {
val result = coroutineScope {
select<String> {
async {
delay(300)
println("slow")
"Slow Result"
}.onAwait { it }
async {
delay(200)
println("fast")
"Fast Result"
}.onAwait { it }
}.also { coroutineContext.cancelChildren() }
}
println("Result=$result")
}
I'd suggest starting directly from here and splitting it into two features.
The first one is the combination of coroutineScope { select { ... }.also { coroutineContext.cancelChildren() } } which introduces a scope that extends SelectBuilder with an additional DSL (see below). Let's call it __selectAndCancelRest__ for now.
The other is a combination of async { ... }.onAwait { it }. Let's call it __async__ for now. With those two pieces, you can now write:
suspend fun main() {
val result = __selectAndCancelRest__<String> {
__async__ {
delay(300)
println("slow")
"Slow Result"
}
__async__ {
delay(200)
println("fast")
"Fast Result"
}
}
println("Result=$result")
}
This construct is more composable than a plain raceOf, as you can also do other onXxx clauses inside and, for example, easily throw receiving from a channel into the mix that would win and cancel all the ongoing __async__ operations.
P.S. Naming needs a careful design here.
This looks like #1065
It does not have to involve #1065 at all. In order for the race DSL to be clear, it is critical not to expose the internal CoroutineScope (that gets its children cancelled after select) to the outside public. It can be very error-prone if you can accidentally launch inside this scope, and have your freshly launched coroutine cancelled on the scope exit, violating your usual expectation on how children work (e.g. channel.onReceive { it -> launch { doSomething(it) } } suddenly complies, but does not work).
It is quite important here to clearly name all __xxx__ pieces in this DSL to carry this "implicit cancellation" meaning and avoid misunderstanding.
@elizarov I think this use case is one more specific than #1065, a recurring problem like #424.
This issue proposes a DSL to fix this specific use case, but it doesn't resolve similar use cases (this allows leak in channel's receive).
Using the #1065 idea, it is possible to implement your idea:
val result = coroutineScope {
select<String> {
async (onScopeTermination = ScopeTermination.CANCEL) {
delay(300)
println("slow")
"Slow Result"
}.onAwait { it }
async (onScopeTermination = ScopeTermination.CANCEL) {
delay(200)
println("fast")
"Fast Result"
}.onAwait { it }
}
}
println("Result=$result")
I understand that this version is more verbose because the syntax is not specific to this issue.
The wrapping coroutineScope looks like boilerplate and unwanted indent in this snippet posted just above. Is it required?
I reused the #1065 idea, similar to the elizarov's first example, so an extra coroutineScope is required.
I am not convinced that the __selectAndCancelRest__ is a good idea, I prefer an API similar to onTimeout (this fires or has been cancelled), something like:
suspend fun main() {
val result = select<String> {
__onAwait__ {
delay(300)
println("slow")
"Slow Result"
}
__onAwait__ {
delay(200)
println("fast")
"Fast Result"
}
}
println("Result=$result")
}
Usually, I find the most convenient idiom for this
suspend fun <R> race(vararg races: suspend () -> R): R {
return channelFlow {
for (race in races) {
launch { send(race()) }
}
}.first()
}
...instead of bothering with any select shenanigans at all. This doesn't need any extra work to handle cancellation.
I'm wondering whether that interesting solution leveraging callbackFlow is more or less efficient than the solution using select. Maybe @elizarov knows?