kotlinx.coroutines
kotlinx.coroutines copied to clipboard
CoroutineScheduler: oversubscription in Dispatchers.Default when switching to IO with CPU tasks in local queue
trafficstars
package kotlinx.coroutines.scheduling
import kotlinx.coroutines.*
import kotlinx.coroutines.scheduling.CoroutineScheduler.Companion.MAX_SUPPORTED_POOL_SIZE
import org.junit.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
class CoroutineSchedulerOversubscriptionStressTest : TestBase() {
private val poolSize = Runtime.getRuntime().availableProcessors()
private val inDefault = AtomicInteger(0)
fun doCheck() {
println("In default dispatcher, blocking thread #${inDefault.incrementAndGet()}, max allowed threads: $poolSize")
}
@Test
fun testOverSubscription() = runTest {
val barrier = CountDownLatch(1)
// All threads but one
repeat(CORE_POOL_SIZE - 1) {
launch(Dispatchers.Default) {
doCheck()
barrier.await()
}
}
Thread.sleep(200)
withContext(Dispatchers.Default) {
// Put a task in a local queue, it will be stolen
launch(Dispatchers.Default) {
doCheck()
barrier.await()
}
// Put one more task to trick the local queue check
launch(Dispatchers.Default) {
doCheck()
barrier.await()
}
withContext(Dispatchers.IO) {
Thread.sleep(1000) // Wait for stealing
}
}
}
}
Will print In default dispatcher, blocking thread #17, max allowed threads: 16