vertx-sql-client icon indicating copy to clipboard operation
vertx-sql-client copied to clipboard

PgSubscriberImpl.fetch() not implemented

Open andreas-eberle opened this issue 6 months ago • 3 comments

Questions

The function PgSubscriberImpl.fetch() is not implemented and throws an UnsupportedOperationException. This is a problem when using the Mutiny wrapper for the vertx-pg-client e.g. in Quarkus.

When you use PgSubscriber mechanism in a quarkus project leveraging the Mutiny framework, the following code runs into the UnsupportedOperationException.

import javax.enterprise.context.ApplicationScoped
import io.quarkus.runtime.ShutdownEvent

import javax.enterprise.event.Observes

import io.quarkus.runtime.StartupEvent
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber
import io.vertx.pgclient.PgConnectOptions
import java.time.Duration


@ApplicationScoped
class PgChannelObserver {

    fun onStart(@Observes ev: StartupEvent?, vertx: Vertx) {

        val subscriber = PgSubscriber.subscriber(vertx, connectOptions())
        subscriber.connect().await().atMost(Duration.ofSeconds(60))
        
        val channel = subscriber.channel("testchannel")        // I trigger the pg_notify directly in an pgsql session for that channel
        
        channel.toMulti().subscribe().with { n -> print(n) }
    }

    private fun connectOptions() = PgConnectOptions()
            .setPort(5432)
            .setHost("localhost")
            .setDatabase("rainbow_database")
            .setUser("unicorn_user")
            .setPassword("magical_password")
}

When I don't use Mutiny and instead use:

channel.handler { notif -> print("Notification received: $notif") }

everything works as expected.

I was wondering why the implementation for fetch is left out and if it is a Mutiny issue (needs to be addresses in smallrye) or related to an incomplete implementation of the PgSubscriber interface?

andreas-eberle avatar Feb 01 '24 16:02 andreas-eberle

it's not implemented in vertx:

    public ReadStream<String> fetch(long amount) {
      throw new UnsupportedOperationException();
    }

vietj avatar Feb 01 '24 17:02 vietj

I think it could be implemented easily given that this class does not (purposely) buffers events

vietj avatar Feb 01 '24 17:02 vietj

are you interested in contributing this @andreas-eberle

vietj avatar Feb 02 '24 13:02 vietj