spring-cloud-function
spring-cloud-function copied to clipboard
Exception caused by suspend lambda of kotlin that implements java.util.function.Consumer
Describe the bug
According to the docs, a lambda can be used to implement java.utl.function.Cusumer
.
However, if calls a suspend method in the body, the lambda must add suspend
keywords which casuses an exception of java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported
.
Sample
Suppose I have a DataRepository
interface which extends CoroutineCrudRepository
, hence the DataRepository#save
method is a suspend
method (In the CoroutineCrudRepository
interface, it is suspend fun <S : T> save(entity: S): T
).
And the function class is
@Configuration
class DataFunction {
@Bean
fun consumerData(dataRepository: DataRepository): suspend (Data) -> Unit =
{
val result = dataRepository.save(it)
}
}
and it cause the exception
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Multi argument Kotlin functions are not currently supported
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:325)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1409)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:545)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:187)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:119)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:127)
at io.kotest.extensions.spring.SpringAutowireConstructorExtension.instantiate(SpringAutowireConstructorExtension.kt:27)
at io.kotest.engine.spec.InstantiateSpecKt.createAndInitializeSpec(instantiateSpec.kt:30)
at io.kotest.engine.spec.InstantiateSpecKt.instantiate(instantiateSpec.kt:11)
at io.kotest.engine.spec.SpecRefKt.instance(SpecRef.kt:14)
at io.kotest.engine.spec.SpecExecutor.createInstance-gIAlu-s(SpecExecutor.kt:63)
at io.kotest.engine.spec.SpecExecutor.access$createInstance-gIAlu-s(SpecExecutor.kt:24)
at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invokeSuspend(SpecExecutor.kt:40)
at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
at io.kotest.engine.spec.interceptor.ref.FinalizeSpecInterceptor.intercept-0E7RQCE(FinalizeSpecInterceptor.kt:24)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.BeforeSpecStateInterceptor.intercept-0E7RQCE(BeforeSpecStateInterceptor.kt:25)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.PrepareSpecInterceptor.intercept-0E7RQCE(PrepareSpecInterceptor.kt:25)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.ApplyExtensionsInterceptor.intercept-0E7RQCE(ApplyExtensionsInterceptor.kt:36)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.SpecFinishedInterceptor.intercept-0E7RQCE(SpecFinishedInterceptor.kt:22)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.SpecStartedInterceptor.intercept-0E7RQCE(SpecStartedInterceptor.kt:20)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invokeSuspend(SpecRefExtensionInterceptor.kt:28)
at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor.intercept-0E7RQCE(SpecRefExtensionInterceptor.kt:31)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.RequiresTagInterceptor.intercept-0E7RQCE(RequiresTagInterceptor.kt:35)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.TagsInterceptor.intercept-0E7RQCE(TagsInterceptor.kt:38)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.SystemPropertySpecFilterInterceptor.intercept-0E7RQCE(SystemPropertySpecFilterInterceptor.kt:48)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.SpecFilterInterceptor.intercept-0E7RQCE(SpecFilterInterceptor.kt:38)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.IgnoredSpecInterceptor.intercept-0E7RQCE(IgnoredSpecInterceptor.kt:51)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.EnabledIfInterceptor.intercept-0E7RQCE(EnabledIfInterceptor.kt:41)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.ref.RequiresPlatformInterceptor.intercept-0E7RQCE(RequiresPlatformInterceptor.kt:32)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline.execute-0E7RQCE(SpecRefInterceptorPipeline.kt:50)
at io.kotest.engine.spec.SpecExecutor.execute(SpecExecutor.kt:42)
at io.kotest.engine.ConcurrentTestSuiteScheduler$schedule$8$1$1$2.invokeSuspend(ConcurrentTestSuiteScheduler.kt:72)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at io.kotest.common.RunBlockingKt.runBlocking(runBlocking.kt:3)
at io.kotest.engine.launcher.MainKt.main(main.kt:34)
Caused by: java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.getFunctionRegistration(KotlinLambdaToFunctionAutoConfiguration.java:200)
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:165)
at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:39)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet(FunctionConfiguration.java:877)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1817)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
... 103 common frames omitted
Other tries
If using kotlinx.coroutines.flow.Flow
, a non-suspend method CoroutineCrudRepository#saveAll
can be used. Therefore, the lambda signature can be changed to (Flow<Data>) -> Unit
which causes no exceptions in startup.
However, if call the function with a message, another exception occurs:
Caused by: java.lang.ClassCastException: class Data cannot be cast to class kotlinx.coroutines.flow.Flow (Data and kotlinx.coroutines.flow.Flow are in unnamed module of loader 'app')
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:124)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:99)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.accept(KotlinLambdaToFunctionAutoConfiguration.java:146)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1029)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:731)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
ugly workaound
Calling suspend
method in a runBlocking
block that makes the Consumer itself be not in a suspend one.
The version of spring-cloud-function is 4.0.5 (spring-cloud 2022.0.4).