micronaut-data icon indicating copy to clipboard operation
micronaut-data copied to clipboard

Micronaut data fails when trying to convert results to Flow

Open zsiegel opened this issue 1 year ago • 9 comments
trafficstars

Expected Behavior

When using a return type of Flow<MyEntity> and when the given query only returns one result I expect the conversion to work to return a Flow that emits a single item.

Actual Behaviour

When the query returns a single item. Micronaut fails to convert that entity to a Flow.

Steps To Reproduce

  1. Create a CoroutineCrudRepository<MyEntity, Long> repository and entity MyEntity.
  2. Run the .findAll() query after inserting a single row.
  3. The query will fail with an error message below.
io.micronaut.core.convert.exceptions.ConversionErrorException: Cannot convert type [class com.example.MyEntity] to target type: interface kotlinx.coroutines.flow.Flow. Considering defining a TypeConverter bean to handle this case.
	at app//io.micronaut.core.convert.ConversionService.lambda$newConversionError$2(ConversionService.java:182)
	at [email protected]/java.util.Optional.orElseGet(Optional.java:364)
	at app//io.micronaut.core.convert.ConversionService.newConversionError(ConversionService.java:182)
	at app//io.micronaut.core.convert.ConversionService.lambda$convertRequired$0(ConversionService.java:177)
	at [email protected]/java.util.Optional.orElseThrow(Optional.java:403)
	at app//io.micronaut.core.convert.ConversionService.convertRequired(ConversionService.java:177)
	at app//io.micronaut.core.convert.ConversionService.convertRequired(ConversionService.java:159)
	at app//io.micronaut.data.runtime.intercept.AbstractQueryInterceptor.convertOne(AbstractQueryInterceptor.java:213)
	at app//io.micronaut.data.runtime.intercept.AbstractQueryInterceptor.convertOne(AbstractQueryInterceptor.java:202)
	at app//io.micronaut.data.runtime.intercept.async.AbstractConvertCompletionStageInterceptor.lambda$intercept$1(AbstractConvertCompletionStageInterceptor.java:61)
	at [email protected]/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
	at [email protected]/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at [email protected]/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at app//reactor.core.publisher.MonoToCompletableFuture.onNext(MonoToCompletableFuture.java:64)
	at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at app//reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:269)
	at app//reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:528)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:155)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
	at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
	at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at app//reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
	at app//reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:537)
	at app//reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:767)
	at app//reactor.core.publisher.Operators.complete(Operators.java:137)
	at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
	at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at app//reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:879)
	at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
	at app//reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at app//reactor.core.publisher.Operators.complete(Operators.java:137)
	at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264)
	at app//reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
	at app//reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
	at app//reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
	at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260)
	at app//reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
	at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164)
	at app//reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264)
	at app//reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
	at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
	at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at app//reactor.core.publisher.Operators.complete(Operators.java:137)
	at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
	at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
	at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
	at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at app//reactor.core.publisher.Operators.complete(Operators.java:137)
	at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
	at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at app//reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:385)
	at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
	at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:850)
	at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
	at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:898)
	at app//reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:1001)
	at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at app//reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
	at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
	at app//reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
	at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
	at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at app//io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
	at app//reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
	at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at app//reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465)
	at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871)
	at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819)
	at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249)
	at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215)
	at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206)
	at app//io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668)
	at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934)
	at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
	at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
	at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at app//reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at app//reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	at app//reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	at app//reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at app//io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at app//io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at app//io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at app//io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at app//io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at app//io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at app//io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at [email protected]/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: Cannot convert type [class com.example.MyEntity] to target type: interface kotlinx.coroutines.flow.Flow. Considering defining a TypeConverter bean to handle this case.
	... 147 more

Environment Information

MacOS 14.4

openjdk 17.0.1 2021-10-19 LTS OpenJDK Runtime Environment Zulu17.30+15-CA (build 17.0.1+12-LTS) OpenJDK 64-Bit Server VM Zulu17.30+15-CA (build 17.0.1+12-LTS, mixed mode, sharing)

Example Application

No response

Version

4.3.7

zsiegel avatar Mar 23 '24 01:03 zsiegel

Note that I am currently working around this right now by using a TypeConverter. But I have a number of entities that may return a single item from time to time and this is a bit cumbersome.

I was looking around to see if I could submit a PR myself but I am a little lost figuring out where the root of the issue might be.

@Factory
class TypeConverters {
    @Singleton
    fun myEntityToFlow(): TypeConverter<MyEntity, Flow<MyEntity>> {
        return TypeConverter<MyEntity, Flow<MyEntity>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}

zsiegel avatar Mar 23 '24 01:03 zsiegel

I am also seeing some interesting and unexpected behavior when the return is empty. I would expect to get back an empty Flow but instead I get a null object even though the return type is Flow.

zsiegel avatar Mar 23 '24 22:03 zsiegel

Do you have micronaut-kotlin dependency included?

dstepanov avatar Mar 25 '24 10:03 dstepanov

I have the following dependencies related to kotlin specified.

implementation("io.micronaut.kotlin:micronaut-kotlin-extension-functions")
implementation("io.micronaut.kotlin:micronaut-kotlin-runtime")

zsiegel avatar Mar 25 '24 12:03 zsiegel

Try also to add: org.jetbrains.kotlinx:kotlinx-coroutines-reactive or maybe org.jetbrains.kotlinx:kotlinx-coroutines-reactor

dstepanov avatar Mar 25 '24 12:03 dstepanov

Yes I have the first one. I tried adding org.jetbrains.kotlinx:kotlinx-coroutines-reactor and have the same issue.

zsiegel avatar Mar 26 '24 01:03 zsiegel

Testing this out a bit more I think I was able to determine that this only happens with interface methods that use the @Query annotation. The findAll() and other built-in methods seem to return reactor types, but the custom @Query annotation does not?

I have solved this problem with the following factory that handles the conversion for any single object and any list of objects.

@Factory
class TypeConverters {
    @Singleton
    fun listToFlow(): TypeConverter<List<Any>, Flow<Any>> {
        return TypeConverter<List<Any>, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }

    @Singleton
    fun anyToFlow(): TypeConverter<Any, Flow<Any>> {
        return TypeConverter<Any, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}

I am not sure if this makes sense to contribute or if I have properly diagnosed things. I am not sure what the different is between the built-in default methods and using @Query

zsiegel avatar Mar 26 '24 01:03 zsiegel

My implementation above was in correct. I am now trying.

@Factory
class TypeConverters {
    @Singleton
    fun listToFlow(): TypeConverter<List<Any>, Flow<Any>> {
        return TypeConverter<List<Any>, Flow<Any>> { `object`, targetType, context ->
            Optional.of(`object`.asFlow())
        }
    }

    @Singleton
    fun anyToFlow(): TypeConverter<Any, Flow<Any>> {
        return TypeConverter<Any, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}

In addition I am not able to get more than 1 result from the flow. This might be user error but I am struggling a bit to see what might be going wrong here.

zsiegel avatar Apr 16 '24 20:04 zsiegel

We would need a sample app that is reproducing it

dstepanov avatar Sep 09 '24 09:09 dstepanov