kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Unstable context preservation invariant in Flow

Open lowasser opened this issue 3 years ago • 4 comments

Consider https://pl.kotl.in/g6pikDNtt:

    val topFlow = flow {
        emit(1)
        delay(100L)
        emit(2)
        emit(3)
        delay(300L)
        emit(4)
    }
    
    val bottomFlow = flow {
      topFlow
       .onEach { emit(it) }
       .sample(periodMillis = 200)
       .collect { println(it) }
    }
    
    runBlocking {
        bottomFlow.collect { println("$it") }
    }

This fails with the error

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Emission from another coroutine is detected.
		Child of "coroutine#2":ProducerCoroutine{Active}@1810399e, expected child of "coroutine#1":BlockingCoroutine{Active}@32d992b2.
		FlowCollector is not thread-safe and concurrent emissions are prohibited.
		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'

I would not have expected -- certainly not from the documentation -- that sample could cause that issue. I'm still trying to grasp what exactly it's doing that violates the invariant required by the flow builder, but it certainly seems that it does.

lowasser avatar Oct 11 '22 19:10 lowasser

Unfortunately, it is not sample problem but a much deeper one.

We have a flow invariant that explicitly prohibits emissions from context different from the context target flow is being collected, but we do explicitly allow collection of the flow in a different context.

sample bug exposes the fatal flaw of such approach -- it is non-composable. Self-contained reproducer without sample:


fun <T> Flow<T>.allowed(): Flow<T> = flow {
    coroutineScope {
        val channel = produce {
            collect { send(it) }
        }

        for (e in channel) {
            emit(e)
        }
    }
}

@Test
fun repro() = runBlocking<Unit> {
    val flow = flowOf(1, 2, 3, 4)
    flow.allowed().collect()
    println("Collected #1")

    val withNesting = flow {
        flow.onEach { emit(it) }.allowed().collect()
    }
    withNesting.collect()
    println("Collected #2")
}

qwwdfsad avatar Oct 12 '22 11:10 qwwdfsad

The problem is even worse because it seems like we cannot track the job and consider such context changes meaningful because they can potentially race with each other.

Racy example (with unsafeFlow to disable invariant check):

fun <T> Flow<T>.allowed(): Flow<T> = unsafeFlow {
    coroutineScope {
        val channel = produce(Dispatchers.Default) {
            collect { send(it) }
        }

        for (e in channel) {
            emit(e)
        }
    }
}

@Test
fun repro() = runBlocking<Unit> {
    repeat(100) {
        val flow = (0..99).asFlow()
        val withNesting = unsafeFlow {
            flow.onEach {
                emit(it)
            }.allowed().collect {
                emit(it)
            }
        }
        val list = withNesting.toList()
        assertEquals(200, list.size, "Iteration #$it")
    }
}

In fact, we can neither check it dynamically, nor statically. The only thing we can do here is a better error diagnostic and IDE assistance

qwwdfsad avatar Oct 18 '22 09:10 qwwdfsad

Note, that the complexity of the problem is aggravated by the fact that currently we cannot statically detect such erronous code during compilation to issue a warning. See https://youtrack.jetbrains.com/issue/KT-15514 for details.

elizarov avatar Oct 18 '22 10:10 elizarov

@qwwdfsad https://github.com/Kotlin/kotlinx.coroutines/issues/3480#issuecomment-1276032371 reads:

Unfortunately, it is not sample problem but a much deeper one.

We have a flow invariant that explicitly prohibits emissions from context different from the context target flow is being collected, but we do explicitly allow collection of the flow in a different context.

I've stumbled on "is being collected". Is that intended to mean "started to emit from"?

Resulting in something like:

We have a flow invariant that explicitly prohibits emissions from a context different from the context the flow started to emit from, but we do explicitly allow collection of the flow in a different context.

EDIT:

Analyzing the context preservation invariant in SafeCollector<*>.checkContext as follows:

  • A flow is allowed to emit from its collection context or any child coroutine, as long as all context elements except the Job element are identical.

Then my question regarding the above changes to: What exactly is meant by the following wording?

but we do explicitly allow collection of the flow in a different context.

Does that refer to flowOn?

OliverO2 avatar Sep 05 '23 08:09 OliverO2