vertx-lang-kotlin icon indicating copy to clipboard operation
vertx-lang-kotlin copied to clipboard

Receiver class ChannelReadStream does not define or inherit an implementation

Open Globegitter opened this issue 5 years ago • 2 comments

I am trying to take a RowStream from the postgres client and convert it to a Flow. One way that is possible is via stream.toChannel(vertx).consumeAsFlow() consumeAsFlow comes from import kotlinx.coroutines.flow.consumeAsFlow and I am using version 1.3.1 of the coroutines package.

Then I am getting the following error:

    java.lang.AbstractMethodError: Receiver class io.vertx.kotlin.coroutines.ChannelReadStream does not define or inherit an implementation of the resolved method 'abstract java.lang.Object receiveOrClosed(kotlin.coroutines.Continuation)' of interface kotlinx.coroutines.channels.ReceiveChannel.                                                                                      
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:45)
        at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source)
        at kotlinx.coroutines.flow.ConsumeAsFlow.collect(Channels.kt:124)
        at shp.games.name.name.name.Name$$special$$inlined$map$1.collect(SafeCollector.kt:127)                                                                    
        at shp.games.name.name.name.Name.getEvents(Name.kt:244)                                                                                            
        at shp.games.name.name.name.Name$getEvents$1.invokeSuspend(Name.kt)                                                                                
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)                                                                                             
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
        at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:270)
        at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:79)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)
        at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:36)
        at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
        at shp.games.name.name.name.NameTest$1.invokeSuspend(NameTest.kt:66)                                                                               
        at shp.games.name.name.name.NameTest$1.invoke(NameTest.kt) 

And I can indeed not find this method being implemented.

Globegitter avatar Sep 13 '19 13:09 Globegitter

I got the same error with Vert.x 4.1.0 and Kotlin 1.4.32:

kotlinx/coroutines/channels/ReceiveChannel.receiveOrClosed-WVj179g(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
java.lang.AbstractMethodError: kotlinx/coroutines/channels/ReceiveChannel.receiveOrClosed-WVj179g(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:50)
	at kotlinx.coroutines.flow.ChannelAsFlow.collect(Channels.kt:158)
	at test.DummyName$dummyFunAsFlow$$inlined$map$1.collect(SafeCollector.common.kt:114)
	at kotlinx.coroutines.flow.FlowKt__CollectionKt.toCollection(Collection.kt:32)
	at kotlinx.coroutines.flow.FlowKt.toCollection(Unknown Source)
	at kotlinx.coroutines.flow.FlowKt__CollectionKt.toList(Collection.kt:15)
	at kotlinx.coroutines.flow.FlowKt.toList(Unknown Source)
	at kotlinx.coroutines.flow.FlowKt__CollectionKt.toList$default(Collection.kt:15)
	at kotlinx.coroutines.flow.FlowKt.toList$default(Unknown Source)
	at test.DummyNameTest$testDummy$1.invokeSuspend(DummyNameTest.kt:66)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)

I came up with the following workaround to convert a RowStream to a Flowable then call asFlow() to get a Flow:

    fun rxFlowableToKotlinFlow(
        connection: io.vertx.reactivex.sqlclient.SqlConnection,
        sql: String,
        args: io.vertx.reactivex.sqlclient.Tuple,
        bufferCapacity: Int
    ): Flow<Row> {
        return connection.rxPrepare(sql)
            .flatMapPublisher { ps: PreparedStatement ->
                ps.createStream(bufferCapacity, args).toFlowable()
            }.asFlow()
    }

Please note that it must be called inside a transaction otherwise RowStream/Cursor is not supported.

haoming avatar Aug 20 '21 22:08 haoming

can you provide a generic reproducer without the SQL client ? ie a mock stream

vietj avatar Aug 23 '21 08:08 vietj

@vietj do you mind taking a look at this one? And perhaps backport to 4.3?

tsegismont avatar Feb 02 '23 17:02 tsegismont

Maybe outdated, this test does not fail on 4.x branch:

https://github.com/vert-x3/vertx-lang-kotlin/blob/7177fe6c114ace66ed70dfb66b171f12bcf24e0e/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/ReceiveChannelHandlerTest.kt#L249-L264

tsegismont avatar Oct 06 '23 12:10 tsegismont