reactive-pg-client
reactive-pg-client copied to clipboard
Incomplete implementation for PubSub using Mutiny / Kotlin
Hello there,
I was trying to implement a PgSubscriber mechanism in a quarkus project leveraging the Mutiny framework.
Assuming the following "reduced" code should work ...
package com.wftest
import javax.enterprise.context.ApplicationScoped
import io.quarkus.runtime.ShutdownEvent
import javax.enterprise.event.Observes
import io.quarkus.runtime.StartupEvent
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber
import io.vertx.pgclient.PgConnectOptions
import java.time.Duration
@ApplicationScoped
class PgChannelObserver {
fun onStart(@Observes ev: StartupEvent?, vertx: Vertx) {
val subscriber = PgSubscriber.subscriber(vertx, connectOptions())
subscriber.connect().await().atMost(Duration.ofSeconds(60))
val channel = subscriber.channel("testchannel") // I trigger the pg_notify directly in an pgsql session for that channel
channel.toMulti().subscribe().with { n -> print(n) }
}
private fun connectOptions() = PgConnectOptions()
.setPort(5432)
.setHost("localhost")
.setDatabase("rainbow_database")
.setUser("unicorn_user")
.setPassword("magical_password")
}
I get an UnsupportedOperationException on startup and digging further into the stack trace stumbling over the reason:
PgSubscriberImpl:
// Since Vert.x 3.6.0 : todo
public ReadStream<String> fetch(long amount) {
throw new UnsupportedOperationException();
}
When I don't use Mutiny and instead use:
channel.handler { notif -> print("Notification received: $notif") }
everything works as expected.
I was wondering why the implementation for fetch is left out and if it is a Mutiny issue (needs to be addresses in smallrye) or related to an incomplete implementation of the PgSubscriber interface?
Unfortunately, this issue is still up-to-date :(