kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Flow with resources

Open whyoleg opened this issue 3 years ago • 11 comments

On current moment, looks like there is no way to safely pass resources through Flow. Like f.e. files, or buffers, as if cancellation appears somewhere during emit, resource will be just leaked. It's possible to handle it on user level, by catching exceptions from emit.

Example code, which simply emulates cancellation (can be done in other way of course):

class A: Closeable {
    init {
        println("Create $this")
    }
    override fun close() {
        println("Close  $this")
    }
}

suspend fun main() {
    flow<A> {
        repeat(10) {
            emit(A())
        }
    }.onEach { /*some logic here*/ it.close() }.launchIn(GlobalScope).cancelAndJoin()
}

It will print just one line about creation of object, and not closing. It can be fixed by using extension function like this instead of emit:

suspend fun <T: Closeable> FlowCollector<T>.safeEmit(value: T) {
    try {
        emit(value)
    } catch(cause: Throwable) {
        value.close()
        throw cause
    }
}

But it would be cool to have some possibility to handle it using some operator, that will be applied to upstream like onDrop:

suspend fun main() {
    flow<A> {
        repeat(10) {
            emit(A())
        }
    }.onDrop { it.close() } //safely close resource if it dropped
     .onEach { /*some logic here*/ it.close() }.launchIn(GlobalScope).cancelAndJoin()
}

On current moment, I was not able to implement it on my own... Also, this would be also helpful for flows with buffer (with channel under the hood, using `onUndeliveredElement).

I haven't found any similar issue in repository to track. BTW availability of such operator will reduce complexity of using libraries such as rsocket-kotlin, where flow of resources is using heavily

whyoleg avatar Apr 12 '21 10:04 whyoleg

The problem with such an operator is that it cannot potentially know various suspension points/coroutine switches etc. both upstream and downstream.

For example, let's take a look at the following snippet:

(1..100).asFlow()
    .buffer(1000) // Buffer elements, basically separating collector and emitter in two coroutines
    .anyCustomOperator() // Hide the buffering flow 
    .onElementDrop { ... } 
    .collect {
        throw CancellationException()
    }

The upstream flow will successfully emit 100 elements [to the channel buffer] and will complete, but the collector will reject all the elements as soon as the first arrives.

If we have an explicit buffer (or even worse -- operator with implicit buffering), then successful emit from the upstream does not guarantee that the element is actually received, neither it has mechanisms to intercept such call.

qwwdfsad avatar Apr 12 '21 17:04 qwwdfsad

In Reactor, it's Flux#using

He-Pin avatar Apr 21 '22 04:04 He-Pin

It seems like using is prone to the same issue, though further analysis is required.

qwwdfsad avatar Apr 22 '22 09:04 qwwdfsad

I think Flux#using doesn't solve this particular issue, as it creates and destroys one resource per subscriber, it doesn't allow one to treat elements as closeable resources. Basically, it's a thing for doing something like "if subscribed to, open a file, then, using that file, output the flow of its lines, and close the file when finished", not "here's a list of filenames, open them as files, do something, and don't forget to close them".

dkhalanskyjb avatar Apr 22 '22 12:04 dkhalanskyjb

@dkhalanskyjb You can compose with Flux.flatmap.

Flux.from(...a group of file names)
       .flatMap(file -> Flux.using(...open it, ...handleit, ...close it )
       ..... 

He-Pin avatar Apr 25 '22 03:04 He-Pin

Sure, but this pattern doesn't give one the ability to safely pass open resources, which is what's being discussed.

Flux#using is logically equivalent to something like this with Flow:

flow {
    openResource().use {
        emitAll(it.getFlow())
    }
}

dkhalanskyjb avatar Apr 25 '22 07:04 dkhalanskyjb

Hey, I've tried to do something with this issue here - https://github.com/whyoleg/kotlinx.coroutines/commit/e8bfe317a984d4b6ecb09e46ac53edfdb180a95c If possible, can someone take a look, if it even a thing that can be explored? It's super simple POC, and work only in some cases, but can be something that will work. On current moment I've tested simple cases, but here is what worries me:

  • when using take(n) after on*ElementDrop it will still drop last element collected successfully, because it's hard to distinguish between successful termination with exception
  • because we provide onDrop action in context, it's propogated inside flow builders, and if someone will run flow.collect in this builder it will implicitly receive this action - not sure if it's good behavior

P.S. reactor Flux has Flux.doOnDiscard and Hooks.onNextDropped

whyoleg avatar Sep 01 '22 18:09 whyoleg

One more reason, why I returned here, as I wanted to use channelFlow with closeable resource, but can't because Im not able to install onUndeliveredElement handler in it. Then I've decided to use produceIn - and the same issue, there is no possibility to install it there. So I need to fallback to custom channel to flow conversion, even it's almost the same, except this handler. And because of it, I can't rely anymore on fusion and subsequent call like myflow.buffer will create additional channel, which of course not needed in my case.

whyoleg avatar Sep 01 '22 18:09 whyoleg

I'm not sure I follow why it's not plausible to write, for example,

flow<A> {
  repeat(10) {
    A().use { emit(it) }
  }
}

or

fun <T : Closeable> Flow<T>.closing(): Flow<T> = transform {
  it.use { emit(it) }
}

which ends up wrapping every emit with a try/finally, which ought to accomplish the appropriate closure?

lowasser avatar Sep 01 '22 18:09 lowasser

Main issue is that If there will be buffer call - it will not work, as it will put resource in channel And it should not be use but close only on error, f.e. in case terminal operator is toList

whyoleg avatar Sep 01 '22 18:09 whyoleg

This reminds me :https://github.com/reactor/reactor-core/issues/999

He-Pin avatar Sep 04 '22 11:09 He-Pin