vertx-lang-kotlin
vertx-lang-kotlin copied to clipboard
Receiver class ChannelReadStream does not define or inherit an implementation
I am trying to take a RowStream
from the postgres client and convert it to a Flow
. One way that is possible is via stream.toChannel(vertx).consumeAsFlow()
consumeAsFlow
comes from import kotlinx.coroutines.flow.consumeAsFlow
and I am using version 1.3.1 of the coroutines package.
Then I am getting the following error:
java.lang.AbstractMethodError: Receiver class io.vertx.kotlin.coroutines.ChannelReadStream does not define or inherit an implementation of the resolved method 'abstract java.lang.Object receiveOrClosed(kotlin.coroutines.Continuation)' of interface kotlinx.coroutines.channels.ReceiveChannel.
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:45)
at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source)
at kotlinx.coroutines.flow.ConsumeAsFlow.collect(Channels.kt:124)
at shp.games.name.name.name.Name$$special$$inlined$map$1.collect(SafeCollector.kt:127)
at shp.games.name.name.name.Name.getEvents(Name.kt:244)
at shp.games.name.name.name.Name$getEvents$1.invokeSuspend(Name.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:270)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:79)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:36)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at shp.games.name.name.name.NameTest$1.invokeSuspend(NameTest.kt:66)
at shp.games.name.name.name.NameTest$1.invoke(NameTest.kt)
And I can indeed not find this method being implemented.
I got the same error with Vert.x 4.1.0 and Kotlin 1.4.32:
kotlinx/coroutines/channels/ReceiveChannel.receiveOrClosed-WVj179g(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
java.lang.AbstractMethodError: kotlinx/coroutines/channels/ReceiveChannel.receiveOrClosed-WVj179g(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:50)
at kotlinx.coroutines.flow.ChannelAsFlow.collect(Channels.kt:158)
at test.DummyName$dummyFunAsFlow$$inlined$map$1.collect(SafeCollector.common.kt:114)
at kotlinx.coroutines.flow.FlowKt__CollectionKt.toCollection(Collection.kt:32)
at kotlinx.coroutines.flow.FlowKt.toCollection(Unknown Source)
at kotlinx.coroutines.flow.FlowKt__CollectionKt.toList(Collection.kt:15)
at kotlinx.coroutines.flow.FlowKt.toList(Unknown Source)
at kotlinx.coroutines.flow.FlowKt__CollectionKt.toList$default(Collection.kt:15)
at kotlinx.coroutines.flow.FlowKt.toList$default(Unknown Source)
at test.DummyNameTest$testDummy$1.invokeSuspend(DummyNameTest.kt:66)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
I came up with the following workaround to convert a RowStream
to a Flowable
then call asFlow() to get a Flow
:
fun rxFlowableToKotlinFlow(
connection: io.vertx.reactivex.sqlclient.SqlConnection,
sql: String,
args: io.vertx.reactivex.sqlclient.Tuple,
bufferCapacity: Int
): Flow<Row> {
return connection.rxPrepare(sql)
.flatMapPublisher { ps: PreparedStatement ->
ps.createStream(bufferCapacity, args).toFlowable()
}.asFlow()
}
Please note that it must be called inside a transaction otherwise RowStream
/Cursor
is not supported.
can you provide a generic reproducer without the SQL client ? ie a mock stream
@vietj do you mind taking a look at this one? And perhaps backport to 4.3?
Maybe outdated, this test does not fail on 4.x
branch:
https://github.com/vert-x3/vertx-lang-kotlin/blob/7177fe6c114ace66ed70dfb66b171f12bcf24e0e/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/ReceiveChannelHandlerTest.kt#L249-L264