kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Suggestion for a potential new Flow's `timeout` extension
Considerations
- Did you check the latest version of the library?
Yes
- Do you actually need this feature? Maybe restructuring your code would neatly eliminate the problem the feature would be solving.
I'm not sure, I just wanted to bring this up since my teammates found this extension to be useful.
Use Case
Simplify Flow's Timeout extension fallbacks
The Shape of the API
fun <T> Flow<T>.timeout(
timeout: Duration,
onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
if (cause is TimeoutCancellationException) {
// Catch the TimeoutCancellationException emitted above.
// Emit desired item on timeout.
onTimeout(cause)
} else {
// Throw other exceptions.
throw cause
}
}
Usage
someFlow().timeout(X.seconds) {
emitAll(someFallbackFlow())
}
Prior Art (Why - Thought Process)
When I looked at the timeout documentation I thought it was interesting I would need to "copy-paste" the sample of the documentation in order to deal with the fallback the way it's intended, so I thought it would be good to provide an alternative to developers to make this process a tiny bit easier, since otherwise we could run into lots of timeout blocks with very similar catch logic. Ideally we would want to handle exceptions as close as their source as possible, so the code sample made a lot of sense to me, then why not automating that a bit?
If we think the feature request makes sense, I'm happy to go ahead and make a code contribution. I just wanted to make sure this made sense beforehand.
Timeout Related Sources / Issues
- https://github.com/Kotlin/kotlinx.coroutines/issues/3789
- https://github.com/Kotlin/kotlinx.coroutines/issues/3716
- https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
@dkhalanskyjb Do you think it'd make sense to make a contribution for this?
@tinder-cesardiez, both core maintainers are on vacation, so it may take a while before the proposal is evaluated.
The exact suggestion has a flaw. Consider this code:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds
fun <T> Flow<T>.timeout(
timeout: Duration,
onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
if (cause is TimeoutCancellationException) {
// Catch the TimeoutCancellationException emitted above.
// Emit desired item on timeout.
onTimeout(cause)
} else {
// Throw other exceptions.
throw cause
}
}
fun main() = runBlocking {
flow<Int> {
awaitCancellation()
}.timeout(50.milliseconds)
.timeout(Duration.INFINITE) {
println("Should be printed an infinity later")
}.collect {
}
}
(runnable version: https://pl.kotl.in/_s0UsTsR4)
timeout with the lambda will catch the exception from an unrelated timeout, and the println will be printed. This can be a real concern when you have several independent parameterized timeouts for one flow.
On the other hand, using timeout with catch is completely transparent: timeout throws an exception on a timeout, and catch catches it, obviously.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds
fun main() = runBlocking {
flow<Int> {
awaitCancellation()
}.timeout(50.milliseconds)
.timeout(Duration.INFINITE)
.catch {
if (cause is TimeoutCancellationException) {
println("One of the timeouts fired.")
} else {
throw cause
}
}.collect {
}
}
This transparency and predictability is pretty valuable. If we introduce a timeout with a lambda, I'm pretty sure we'd want to avoid this gotcha, but then, we will have to introduce some new logic that's not just a combination of the existing operators.
This is not a huge issue if this new operator is actually useful: it has some learning curve on its own, but it's still a fairly straightforward API. Is it useful? In which scenarios? Looking for this exact pattern, I didn't find many people using it: https://grep.app/search?q=timeout%5C%28%5B%5E%29%5D%2A%5C%29%5Cs%2A%5C.catch%5Cs%2A%7B®exp=true&case=true&filter[lang][0]=Kotlin
@tinder-cesardiez, this is similar to how I originally thought about this flow operator, until @qwwdfsad brought up that it was non-orthogonal and referenced this comment: https://github.com/Kotlin/kotlinx.coroutines/pull/2745#pullrequestreview-681969186
TL;DR - Flow operators should have one job, and it should be composable with other operators. This suggestion goes against the one job per operator idea, as you now will have a timeout operator that can also catch exceptions and re-emit.
A use case from https://github.com/Kotlin/kotlinx.coroutines/issues/4250:
Get the latest value from the cache, substituting a default value if the request takes too long
Little point on that use-case from #4250 , is that it would be sufficient to add a terminal operator latestOrNull(timeout: Duration) for it. (I'm the one who actually had this problem). I did it with a much simpler implementation:
suspend fun <T> Flow<T>.latestOrNull(timeout: Duration): T? {
var latestValue: T? = null
withTimeoutOrNull(duration) {
collect {
latestValue = it
}
}
return latestValue
}