kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Flow.catch skips upstream exceptions

Open alex-dorokhov opened this issue 1 year ago • 16 comments

It looks like under some race conditions using the Flow.catch operator skips upstream exceptions. For example:

try {
  flow {
     // some work...
     throw RuntimeException()
  }.catch {
      // this may be not called and the RuntimeException is passed downstream
  }.collect()
} catch (e: RuntimeException) {
  // exception is caught here instead
}

We are using pretty old 1.6.1, but since then the catch operator implementation was not changed. From the source code of the catch operator it looks like the behaviour is possible when both downstream and upstream exceptions are caught, which can happen in race conditions.

We believe it's totally unclear from the documentation and especially from the semantics of 'catch' operator, that any upstream exception could skip the operator. For now we had to switch channelFlow builder (instead of flow) and to use try-catch.

Could it be that the code here should be replaced with "return e"?

alex-dorokhov avatar Jun 18 '24 22:06 alex-dorokhov

Could you provide any reproducer for the scenario where an exception is thrown upstream but is completely ignored?

qwwdfsad avatar Jul 08 '24 10:07 qwwdfsad

try {
    flow<Int> {
        emit(1)
        yield()
        println("Upstream failure")
        throw RuntimeException("Upstream flow failing")
    }.buffer(1).flowOn(Dispatchers.Default)
        .catch { println("Catch operator: $it") }
        .collect {
            println("Downstream throw")
            throw TestException()
        }
} catch (e: Throwable) {
    println("Catch block: $e " + e.suppressed.contentToString())
}

In this scenario, it is possible to observe "Catch block" upstream exception without "Catch operator: " being invoked though

qwwdfsad avatar Jul 08 '24 10:07 qwwdfsad

suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .catch { println(".catch caught $it") }
    println(flow.first())
}

outputs

yo

whilst:

suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .catch { println(".catch caught $it") }
        .flowOn(Dispatchers.Default)
    println(flow.first())
}

outputs

.catch caught java.lang.UnsupportedOperationException
yo

whilst:

suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .flowOn(Dispatchers.Default)
        .catch { println(".catch caught $it") }
    println(flow.first())
}

outputs

Exception in thread "main" java.lang.UnsupportedOperationException
	at MainKt$main$flow$1.invokeSuspend(Main.kt:8)
	at MainKt$main$flow$1.invoke(Main.kt)
	at MainKt$main$flow$1.invoke(Main.kt)
	at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:57)
	at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:226)
	at kotlinx.coroutines.flow.internal.ChannelFlowOperatorImpl.flowCollect(ChannelFlow.kt:191)
	at kotlinx.coroutines.flow.internal.ChannelFlowOperator.collectTo$suspendImpl(ChannelFlow.kt:153)
	at kotlinx.coroutines.flow.internal.ChannelFlowOperator.collectTo(ChannelFlow.kt)
	at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:56)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:102)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:816)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
	Suppressed: kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed

Was about to create a new issue for this but came across this one, is it the same bug?

nbransby avatar Jul 15 '24 14:07 nbransby

@qwwdfsad do you have any insight into what is going on here?

nbransby avatar Jul 18 '24 13:07 nbransby

@nbransby this is a leaking behaviour of multiple aspects: concurrency imposed by flowOn, catch operator being able to emit and that first cancels asynchronous coroutine (e.g. the behaviour won't reproduce if you replace it with toList).

It's a bit more complicated than just an omission or a trivial bug and I'm yet to make my mind about this behaviour, its validity and whether we can fix it without breaking even more stuff in the meantime

qwwdfsad avatar Jul 18 '24 16:07 qwwdfsad

The root cause that introduced the problem (and fixed other problems): https://github.com/Kotlin/kotlinx.coroutines/pull/3017 (#2860)

qwwdfsad avatar Jul 18 '24 17:07 qwwdfsad

Was about to create a new issue for this but came across this one, is it the same bug?

Yes, it looks like it is the same bug.

alex-dorokhov avatar Jul 18 '24 19:07 alex-dorokhov

I'm yet to make my mind about this behaviour, its validity and whether we can fix it without breaking even more stuff in the meantime

Have you made your mind up yet? I'm finding the catch operator effectively useless now as it can't be trusted to actually catch upstream exceptions, it seems the closer I move it to the throwing upstream operator the greater likelihood it will catch but obviously this fuzzy behavior is not viable. Another example:

suspend fun main() {
    val flow = flow { emit("a"); delay(1000); emit("b") }
    	.flatMapLatest {
            flow<String> {
                emit("yo")
                throw UnsupportedOperationException()
            }
        }
        .catch { println(".catch caught $it") }
    println(flow.take(SOME).toList())
}

when SOME = 2 we get

.catch caught java.lang.UnsupportedOperationException
[yo]

but when SOME = 1:

Exception in thread "main" java.lang.UnsupportedOperationException
 at FileKt$main$flow$2$1.invokeSuspend (File.kt:9) 
 at FileKt$main$flow$2$1.invoke (File.kt:-1) 
 at FileKt$main$flow$2$1.invoke (File.kt:-1) 

Perhaps the documentation for catch can be amended to reflect the true behavior whilst this issue remains open? https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html

nbransby avatar Sep 29 '24 10:09 nbransby

Also if you can shed any light on this stacktrace that would be super useful, I tried to create a minimal reproducer but didn't have any luck. It's the same issue: an upstream exception is escaping a .catch placed downstream, but this is a shared flow using WhileSubscribed() and is not using .first() or .take().

dev.gitlive.internal.service.api.TooLargeException
	at dev.gitlive.internal.model.team.LocalRepository$commits$lambda$26$$inlined$map$1$2.emit(Emitters.kt:225)
	at dev.gitlive.internal.model.CachedGitClient$execute$$inlined$map$1$2.emit(Emitters.kt:219)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:37)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.access$emitAllImpl$FlowKt__ChannelsKt(Channels.kt:1)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Channels.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith$$$capture(ContinuationImpl.kt:33)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.JobCancellationException: FlowCoroutine is cancelling; job="paradigmxyz/reth branchesNotMergedIntoHead#6000":FlowCoroutine{Cancelled}@7b9ecca5
		at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt:46)
		at kotlinx.coroutines.flow.internal.CombineKt$combineInternal$2$1$1.emit(Combine.kt:33)
		at kotlinx.coroutines.flow.DistinctFlowImpl$collect$2.emit(Distinct.kt:77)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158)
		at kotlinx.coroutines.flow.FlowKt__ZipKt$combine$1$1.invokeSuspend(Zip.kt:33)
		at kotlinx.coroutines.flow.internal.CombineKt$combineInternal$2.invokeSuspend(Combine.kt:76)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl(Errors.kt:156)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catch$$inlined$unsafeFlow$1.collect(Errors.kt:113)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl(Errors.kt:156)
		at kotlinx.coroutines.flow.FlowKt__ErrorsKt$retryWhen$$inlined$unsafeFlow$1.collect(Errors.kt:117)
		at kotlinx.coroutines.flow.internal.CombineKt$combineInternal$2$1.invokeSuspend(Combine.kt:31)
	Caused by: kotlinx.coroutines.JobCancellationException: FlowCoroutine is cancelling; job="paradigmxyz/reth branchesNotMergedIntoHead#6000":FlowCoroutine{Cancelled}@7b9ecca5
		at kotlinx.coroutines.JobSupport.toCancellationException(JobSupport.kt:1517)
		at kotlinx.coroutines.JobSupport.getCancellationException(JobSupport.kt:417)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:99)
		at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
		at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Caused by: [CIRCULAR REFERENCE: dev.gitlive.internal.service.api.TooLargeException]
	Suppressed: [CIRCULAR REFERENCE: kotlinx.coroutines.JobCancellationException: FlowCoroutine is cancelling; job="paradigmxyz/reth branchesNotMergedIntoHead#6000":FlowCoroutine{Cancelled}@7b9ecca5]

I don't understand the circular reference here but guessing something in catchImpl (below) is causing the exception to escape in this case.

// Return exception from upstream or null
@Suppress("NAME_SHADOWING")
internal suspend fun <T> Flow<T>.catchImpl(
    collector: FlowCollector<T>
): Throwable? {
    var fromDownstream: Throwable? = null
    try {
        collect {
            try {
                collector.emit(it)
            } catch (e: Throwable) {
                fromDownstream = e
                throw e
            }
        }
    } catch (e: Throwable) {
        // Otherwise, smartcast is impossible
        val fromDownstream = fromDownstream
        /*
         * First check ensures that we catch an original exception, not one rethrown by an operator.
         * Seconds check ignores cancellation causes, they cannot be caught.
         */
        if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
            throw e // Rethrow exceptions from downstream and cancellation causes
        } else {
            /*
             * The exception came from the upstream [semi-] independently.
             * For pure failures, when the downstream functions normally, we handle the exception as intended.
             * But if the downstream has failed prior to or concurrently
             * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring  that it's not lost.
             */
            if (fromDownstream == null) {
                return e
            }
            /*
             * We consider the upstream exception as the superseding one when both upstream and downstream
             * fail, suppressing the downstream exception, and operating similarly to `finally` block with
             * the useful addition of adding the original downstream exception to suppressed ones.
             *
             * That's important for the following scenarios:
             * ```
             * flow {
             *     val resource = ...
             *     try {
             *         ... emit as well ...
             *     } finally {
             *          resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception
             *     }
             * }.catch { } // or retry
             * .collect { ... }
             * ```
             * when *the downstream* throws.
             */
            if (e is CancellationException) {
                fromDownstream.addSuppressed(e)
                throw fromDownstream
            } else {
                e.addSuppressed(fromDownstream)
                throw e
            }
        }
    }
    return null
}

nbransby avatar Sep 29 '24 22:09 nbransby

but when SOME = 1:

Note: there's some difference between suspend fun main() { and fun main() { runBlocking {, because with the latter, this problem doesn't reproduce: https://pl.kotl.in/I71G9BQHf I haven't looked into how suspend fun main works and what difference this can make, though.

dkhalanskyjb avatar Sep 30 '24 11:09 dkhalanskyjb

As for circular references, here's a compact reproducer that demonstrates them: https://pl.kotl.in/5sUH_6v9F

dkhalanskyjb avatar Sep 30 '24 11:09 dkhalanskyjb

As for circular references, here's a compact reproducer that demonstrates them: https://pl.kotl.in/5sUH_6v9F

Thanks, so in my case its probably the leaky catch missing the upstream exception in the producer which goes on to crash the collectors coroutine scope that leads to the the suppressed JobCancellationException

nbransby avatar Sep 30 '24 13:09 nbransby

My workaround for now is to restrict catch operator usage to flows that are guaranteed to run in the same coroutine.

If I need the exception to propagate across a coroutine boundary introduced by operators such as flowOn, buffer, conflate, flatMapLatest, etc. I change the Flow<T> to Flow<Result<T>>, catch the exception as described above and call emit(Result.failure(it)).

nbransby avatar Oct 03 '24 09:10 nbransby

I have this problem too. Now I can't use catch in the functions which return the flow, because the caller might use buffer somewhere downstream. And I can't use buffer on any flow which I get from a function, because the author might have used catch constructing it. In my opinion this bug is as critical as it gets. I thought I will loose my mind trying to understand why emissions from catch are sometimes missing and to create a reproducer.

Also it might be not, or not only, the catch problem. For example here

fun main() = runBlocking {
    flow {
        emit("a")
        emit("b")
        error("boom")
    }
//        .buffer()
        .collect(::println)
}

if buffer uncommented there are no emissions, straight to error

pacher avatar Oct 18 '24 17:10 pacher

Are there any updates regarding this issue? Look like a very critical problem to me, yet it doesn't look like there is a timeline for fixing this.

DrMetallius avatar Feb 04 '25 16:02 DrMetallius

I spent 8 hours investigating strange flow behaviour in our code base, just to find out that its a bug in kotlin.

Legion2 avatar Jun 02 '25 21:06 Legion2