kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Publisher.asFlow() sometimes fails to propagate thread context elements
trafficstars
This issue was discovered analytically, by reading the source code. No one reported it yet.
val threadLocal = ThreadLocal<String>.withInitial { "default" }
@Test
fun testFlowOnThreadContext() = runBlocking {
val publisher = Publisher<Int> { it ->
it.onSubscribe(object : Subscription {
override fun request(n: Long) {
assertEquals("value", threadLocal.get())
it.onNext(1)
it.onComplete()
}
override fun cancel() {}
})
}
val context1 = Dispatchers.IO + threadLocal.asContextElement("value")
val context2 = threadLocal.asContextElement("value")
// succeeds
publisher.asFlow().flowOn(context1).collect { }
// fails with org.junit.ComparisonFailure: expected:<[value]> but was:<[default]>
publisher.asFlow().flowOn(context2).collect { }
}