micronaut-core icon indicating copy to clipboard operation
micronaut-core copied to clipboard

COROUTINE_SUSPENDED response when calling coroutine in a suspending controller function which returns Flow

Open vekonypeter opened this issue 1 year ago • 9 comments

Expected Behavior

I have a class annotated with @Controller and my method return Flow<...> as response. Inside the method I have to call another suspending function, because e.g. I want to fetch data from the database or call another API which all works via coroutines for me. Due to this my controller method is also a suspending.

When a client calls this method it should get the data sent via the Flow.

Actual Behaviour

status code is 200 , but the response is the following:

"COROUTINE_SUSPENDED"

Steps To Reproduce

/**
 * This returns "COROUTINE_SUSPENDED" as body
 */
@Get("foo1")
suspend fun foo1(): Flow<Char> {
    // get data from db or from other API
    val res = callDbOrOtherApi().toList()

    // want to return a flow that needs the result as an input
    return doSomeOtherStuff(res)
}

fun callDbOrOtherApi(): Flow<String> = flow {
    delay(1000) // other party needs some time to think

    // then it starts to stream the data
    listOf("val1", "val2")
        .forEach { emit(it) }
}

fun doSomeOtherStuff(input: List<String>): Flow<Char> =
    input.joinToString()
        .toList()
        .asFlow()

Environment Information

  • OS: MacOS 12.7.4 (21H1123)
  • Architecture: ARM64
  • Kotlin version: 1.9.23
  • JVM: OpenJDK Temurin-17.0.5+8 (build 17.0.5+8)

Example Application

No response

Version

4.4.2

vekonypeter avatar Aug 29 '24 13:08 vekonypeter

Workaround:

    @Get("foo2")
    fun foo2(): Flow<Char> {

        return flow {
            // get data from db or from other API
            emit(callDbOrOtherApi().toList())
        }.flatMapConcat { res ->

            // want to return a flow that needs the result as an input
            doSomeOtherStuff(res)
        }
    }

vekonypeter avatar Aug 29 '24 13:08 vekonypeter

Please create a sample app that reproduces the problem. Make sure you have added Micronaut Kotlin dependency.

dstepanov avatar Aug 30 '24 09:08 dstepanov

combining Flow and suspend seems a bit weird to me

yawkat avatar Aug 30 '24 09:08 yawkat

I have added a sample project with Flow and it's passing. https://github.com/micronaut-projects/micronaut-core/pull/11135 Please modify it to reproduce your problem

dstepanov avatar Aug 30 '24 09:08 dstepanov

@dstepanov sure, example application here based on your's: https://github.com/vekonypeter/miconaut-core-issue-11131/tree/main

I modified your example, because that way it is surely works. The problem is when you have a suspending function, which returns a Flow, but also contains some other suspending coroutine interaction before. See example here: https://github.com/vekonypeter/miconaut-core-issue-11131/blob/main/src/main/kotlin/com/example/HelloController.kt#L14-L18

Test result here is:

Expected :Hello World
Actual   :"COROUTINE_SUSPENDED"
<Click to see difference>

org.opentest4j.AssertionFailedError: expected: <Hello World> but was: <"COROUTINE_SUSPENDED">
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
	at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
	at com.example.HelloControllerTest.testHelloWorld1(HelloControllerTest.kt:21)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Dependencies seem to be fine, micronaut-kotlin-runtime is added.

Is this a very exotic use-case? For us, it seems pretty common, because all of our database interactions and calls towards other APIs are done using suspending functions.

vekonypeter avatar Aug 30 '24 11:08 vekonypeter

I see, but as Jonas wrote it doesn’t make sense to have suspended Flow.

dstepanov avatar Aug 30 '24 12:08 dstepanov

@vekonypeter the reason it's weird is that you're combining two reactive programming styles (coroutines and flows). In our framework view it becomes a Publisher<Publisher<String>>. You can "unwrap" this in one of the following ways:

Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt	(revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt	(date 1725020074789)
@@ -6,15 +6,16 @@
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.emitAll
 import kotlinx.coroutines.flow.flow
 
 @Controller("/hello")
 class HelloController {
 
     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    fun world1(): Flow<String> = flow {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        emitAll(listOf("Hello World").asFlow())
     }
 
     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])
Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt	(revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt	(date 1725020169453)
@@ -7,14 +7,15 @@
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
 import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.single
 
 @Controller("/hello")
 class HelloController {
 
     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    suspend fun world1(): String {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        return listOf("Hello World").asFlow().single()
     }
 
     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])

yawkat avatar Aug 30 '24 12:08 yawkat

I don't fully understand what you mean by "combining two reactive programming styles (coroutines and flows)". Flows are an integral part of Kotlin coroutines. Maybe the example is just too simple, but imagine the following use-case:

  • I have a controller method
  • first, I need to fetch some data from a remote API. I'm using Ktor HTTP client for that which uses coroutines, therefore returns data through suspending functions.
  • then I go to my database and want to read data from it using the result of the previous API call. The database call returns a Flow or entities and I want to return that from the controller method.

in code:

suspend fun test(): Flow<Any> {
    val res = httpClientMethodCall() // this is a suspending function therefore test() must be also suspending
    return readSomeStuffFromDb(res)
}

I don't think that this is something super weird, pretty usual when you have everything implemented with coroutines and suspending functions. But this will return the "COROUTINE_SUSPENDED" response for sure.

I can make it work somehow like this and probably in a lot of other ways too, but it feels odd:

fun test(): Flow<Any> = flow {
    emit(httpClientMethodCall())
}.map { res ->
    readSomeStuffFromDb(res)
}

vekonypeter avatar Aug 30 '24 13:08 vekonypeter

Both a suspend method and a Flow represent the same thing: A result that will be produced asynchronously in the future (in reactive streams terms, both are a flow). Flow has some added features like multiple items and multiple consumers, but it is conceptually the same thing.

When you have a suspend method that returns a Flow, the framework first has to wait for the suspend method to complete, and then also for the flow to complete. This form of double flow is not supported.

FWIW, this is not exactly idiomatic in general kotlin code either, because it can be annoying to work with even without micronaut involved. This SO answer describes it well: https://stackoverflow.com/a/76031024

yawkat avatar Aug 30 '24 14:08 yawkat