quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Kotlin Coroutine suspend function Integration uses single thread dispatcher

Open Legion2 opened this issue 1 year ago • 9 comments

Describe the bug

When working with kotlin coroutines and suspend functions in quarkus, the code is executed sequentially by a single thread by default. One has to explicitly change the kotlin coroutine dispatcher to get the expected behavior of the Dispatchers.Default which uses a Thread pool.

Expected behavior

By default when using suspend function with quarkus the Dispatchers.Default or Vertx.vertx().dispatcher() should be used as the dispatchers. Or it should be possible to configure the used Dispatcher.

Actual behavior

Some single threaded Dispatcher is used (maybe runBlocking is used) to run the suspend functions.

How to Reproduce?

Add the following code to an RestEasy Resource and call the endpoint

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch {
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 10 seconds

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch(Vertx.vertx().dispatcher()) {
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 1 second

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch(Dispatchers.Default) { //Dispatchers.IO
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 1 second

Output of uname -a or ver

Linux leon-pc 6.5.0-35-generic #35~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue May 7 09:00:52 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

openjdk version "17.0.11" 2024-04-16 OpenJDK Runtime Environment (build 17.0.11+9-Ubuntu-122.04.1) OpenJDK 64-Bit Server VM (build 17.0.11+9-Ubuntu-122.04.1, mixed mode, sharing)

Quarkus version or git rev

3.7.3

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.5

Additional information

No response

Legion2 avatar Jun 18 '24 13:06 Legion2

/cc @geoand (kotlin)

quarkus-bot[bot] avatar Jun 18 '24 13:06 quarkus-bot[bot]

Can someone point me to the source code of the coroutine dispatcher implementation, then I can take a look and try to debug the problem.

Legion2 avatar Jun 24 '24 20:06 Legion2

I found this dispatcher implementation which is used by the resteasy reactive client.

https://github.com/quarkusio/quarkus/blob/32d0c2dd4d935149c71f6e81524fb3a46fe89515/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/ApplicationCoroutineScope.kt#L34-L51

There is also a similar implementation in the vertx extension https://github.com/quarkusio/quarkus/blob/ea2c6a4090d1eccb4b39250198601f7da9454833/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/VertxDispatcher.kt

Both implementations do not use the existing vertx Dispatcher and implement a very basic execution of coroutine on the vertx event loop.

Legion2 avatar Jun 24 '24 21:06 Legion2

I am also interested in the outcome of the analysis of this issue.

mschorsch avatar Jun 25 '24 03:06 mschorsch

Thanks for opening this!

Just so I understand this, you are saying that when are manually handling the launch of coroutines, the dispatcher is single threader?

Also, some comments on things mentioned above:

I found this dispatcher implementation which is used by the resteasy reactive client.

The code you have linked to is for the Quarkus REST server part, not the client.

Both implementations do not use the existing vertx Dispatcher

Which one are you refering to as the existing vertx Dispatcher?

geoand avatar Jun 25 '24 06:06 geoand

Which one are you refering to as the existing vertx Dispatcher?

The vertx.dispatcher() from vertx-lang-kotlin-coroutines (https://vertx.io/docs/vertx-lang-kotlin-coroutines/kotlin/#_running_a_coroutine_from_a_vert_x_context)

mschorsch avatar Jun 25 '24 17:06 mschorsch

Oh, I was not aware of that one at all...

~~I wonder if we should start using it elsewhere too~~ Never mind, that would not work in Quarkus

geoand avatar Jun 25 '24 18:06 geoand

@geoand why can the Dispatcher from vertx-lang-kotlin-coroutines not be used in quarkus. If it can not reused, what changes are needed in quarkus to port the Dispatcher from vertx-lang-kotlin-coroutines.

Legion2 avatar Jun 26 '24 20:06 Legion2

See the code in RESTEasy Reactive that uses its own dispatcher

geoand avatar Jun 27 '24 04:06 geoand

@geoand you mean this code?

            requestScope.activate()
            CurrentRequestManager.set(rrContext)
            try {
                block.run()
            } finally {
                requestScope.deactivate()
            }

I don't know what this code is used for, but it looks like it wraps the actual request code. I think there are two options:

  1. instead of having this in the dispatcher, move it CoroutineInvocationHandler
  2. or, create a wrapper for the Dispatcher of vertx-lang-kotlin-coroutines

I see you also implemented coroutine canceling for canceled requests, which is realy cool. Can't wait having this in my projects.

Legion2 avatar Jul 07 '24 12:07 Legion2

I don't know what this code is used for

You essentially need to propagate a lot of context from the thread were the request initiates to whatever thread is going to be used for running each stage of the suspend(able) function

I see you also implemented coroutine canceling for canceled requests, which is realy cool. Can't wait having this in my projects.

Thanks. I was actually surprised we didn't have this already...

geoand avatar Jul 08 '24 11:07 geoand

You essentially need to propagate a lot of context from the thread were the request initiates to whatever thread is going to be used for running each stage of the suspend(able) function

Have a look at ThreadContextElement

Legion2 avatar Jul 08 '24 12:07 Legion2

I was not aware of that, thanks

geoand avatar Jul 08 '24 12:07 geoand

@geoand have you looked into using ThreadContextElement?

Legion2 avatar Feb 07 '25 18:02 Legion2

I have not

geoand avatar Feb 07 '25 18:02 geoand