kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Flow with resources
On current moment, looks like there is no way to safely pass resources through Flow. Like f.e. files, or buffers, as if cancellation appears somewhere during emit, resource will be just leaked. It's possible to handle it on user level, by catching exceptions from emit.
Example code, which simply emulates cancellation (can be done in other way of course):
class A: Closeable {
init {
println("Create $this")
}
override fun close() {
println("Close $this")
}
}
suspend fun main() {
flow<A> {
repeat(10) {
emit(A())
}
}.onEach { /*some logic here*/ it.close() }.launchIn(GlobalScope).cancelAndJoin()
}
It will print just one line about creation of object, and not closing.
It can be fixed by using extension function like this instead of emit:
suspend fun <T: Closeable> FlowCollector<T>.safeEmit(value: T) {
try {
emit(value)
} catch(cause: Throwable) {
value.close()
throw cause
}
}
But it would be cool to have some possibility to handle it using some operator, that will be applied to upstream like onDrop:
suspend fun main() {
flow<A> {
repeat(10) {
emit(A())
}
}.onDrop { it.close() } //safely close resource if it dropped
.onEach { /*some logic here*/ it.close() }.launchIn(GlobalScope).cancelAndJoin()
}
On current moment, I was not able to implement it on my own... Also, this would be also helpful for flows with buffer (with channel under the hood, using `onUndeliveredElement).
I haven't found any similar issue in repository to track.
BTW availability of such operator will reduce complexity of using libraries such as rsocket-kotlin, where flow of resources is using heavily
The problem with such an operator is that it cannot potentially know various suspension points/coroutine switches etc. both upstream and downstream.
For example, let's take a look at the following snippet:
(1..100).asFlow()
.buffer(1000) // Buffer elements, basically separating collector and emitter in two coroutines
.anyCustomOperator() // Hide the buffering flow
.onElementDrop { ... }
.collect {
throw CancellationException()
}
The upstream flow will successfully emit 100 elements [to the channel buffer] and will complete, but the collector will reject all the elements as soon as the first arrives.
If we have an explicit buffer (or even worse -- operator with implicit buffering), then successful emit from the upstream does not guarantee that the element is actually received, neither it has mechanisms to intercept such call.
In Reactor, it's Flux#using
It seems like using is prone to the same issue, though further analysis is required.
I think Flux#using doesn't solve this particular issue, as it creates and destroys one resource per subscriber, it doesn't allow one to treat elements as closeable resources. Basically, it's a thing for doing something like "if subscribed to, open a file, then, using that file, output the flow of its lines, and close the file when finished", not "here's a list of filenames, open them as files, do something, and don't forget to close them".
@dkhalanskyjb You can compose with Flux.flatmap.
Flux.from(...a group of file names)
.flatMap(file -> Flux.using(...open it, ...handleit, ...close it )
.....
Sure, but this pattern doesn't give one the ability to safely pass open resources, which is what's being discussed.
Flux#using is logically equivalent to something like this with Flow:
flow {
openResource().use {
emitAll(it.getFlow())
}
}
Hey, I've tried to do something with this issue here - https://github.com/whyoleg/kotlinx.coroutines/commit/e8bfe317a984d4b6ecb09e46ac53edfdb180a95c If possible, can someone take a look, if it even a thing that can be explored? It's super simple POC, and work only in some cases, but can be something that will work. On current moment I've tested simple cases, but here is what worries me:
- when using
take(n)afteron*ElementDropit will stilldroplast element collected successfully, because it's hard to distinguish between successful termination with exception - because we provide onDrop action in context, it's propogated inside flow builders, and if someone will run
flow.collectin this builder it will implicitly receive this action - not sure if it's good behavior
P.S. reactor Flux has Flux.doOnDiscard and Hooks.onNextDropped
One more reason, why I returned here, as I wanted to use channelFlow with closeable resource, but can't because Im not able to install onUndeliveredElement handler in it. Then I've decided to use produceIn - and the same issue, there is no possibility to install it there. So I need to fallback to custom channel to flow conversion, even it's almost the same, except this handler. And because of it, I can't rely anymore on fusion and subsequent call like myflow.buffer will create additional channel, which of course not needed in my case.
I'm not sure I follow why it's not plausible to write, for example,
flow<A> {
repeat(10) {
A().use { emit(it) }
}
}
or
fun <T : Closeable> Flow<T>.closing(): Flow<T> = transform {
it.use { emit(it) }
}
which ends up wrapping every emit with a try/finally, which ought to accomplish the appropriate closure?
Main issue is that If there will be buffer call - it will not work, as it will put resource in channel
And it should not be use but close only on error, f.e. in case terminal operator is toList
This reminds me :https://github.com/reactor/reactor-core/issues/999