smallrye-mutiny-vertx-bindings icon indicating copy to clipboard operation
smallrye-mutiny-vertx-bindings copied to clipboard

Uni failures lost during chained invocations of io.vertx.mutiny.sqlclient.Pool#withTransaction() with TransactionPropagation

Open Ultranium opened this issue 1 year ago • 1 comments

I'm having trouble using the io.vertx.mutiny.sqlclient.Pool#withTransaction(TransactionPropagation, Function<...>) method calls in a chain.

There are two versions of this method: one with the TransactionPropagation argument, and one without it. Chaining invocations of the version without TransactionPropagation actually works fine, but when I use the method with this argument, if there is a failure in the upstream, it is not propagated down the stream.

Propagation of items works fine though.

Here is a minimal reproducer:
@Path("/test")
public class MainResource {
    @Inject
    PgPool client;

    @GET
    public void get() {
        client.withTransaction(
                        TransactionPropagation.CONTEXT,
                        (conn1) -> client.withTransaction(

                                TransactionPropagation.CONTEXT,
                                (conn2) -> client.withTransaction(

                                        TransactionPropagation.CONTEXT,
                                        (conn3) -> conn3.preparedQuery("SELECT bad_proc()") // This query is guaranteed to fail
                                                .execute()
                                                .log("conn3")
                                )
                        )
                )
                .subscribe().with(
                        (i -> Log.info("withTransaction: success")),
                        (t -> Log.error("withTransaction: error", t))
                );
    }
}

The code may look kind of stupid, but what I'm trying to achieve is to imitate the JEE @Transactional annotation behavior, propagating a transaction through the chain of various beans' methods' calls (here 3 invocations of the Pool#withTransaction were put in a single method just in the sake of conciseness).

The output of which is:

[io.qua.mut.run.MutinyInfrastructure] (vert.x-eventloop-thread-0) conn3.0 | onSubscribe()
[io.qua.mut.run.MutinyInfrastructure] (vert.x-eventloop-thread-0) conn3.0 | onFailure(class io.vertx.pgclient.PgException(ERROR: function bad_proc() does not exist (42883)))

That's it, onItem and onFailure handlers in subscribe().with() are not executed at all.

I took a look at the generated implementations of both versions of the io.vertx.mutiny.sqlclient.Pool#withTransaction() method and found an interesting difference: the overload without the TransactionPropagation argument is decorated with io.smallrye.mutiny.vertx.AsyncResultUni#toUni:

public <T> io.smallrye.mutiny.Uni<T> withTransaction(Function<io.vertx.mutiny.sqlclient.SqlConnection, io.smallrye.mutiny.Uni<T>> function) { 
    return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(handler -> {
        delegate.withTransaction(new java.util.function.Function<io.vertx.sqlclient.SqlConnection,io.vertx.core.Future<T>>() {
      public io.vertx.core.Future<T> apply(io.vertx.sqlclient.SqlConnection arg) {
            return io.smallrye.mutiny.vertx.UniHelper.toFuture(
                 function.apply(io.vertx.mutiny.sqlclient.SqlConnection.newInstance((io.vertx.sqlclient.SqlConnection)arg))
            );
         }
     }, handler);
}); 

while the version with it is wrapped in a io.smallrye.mutiny.vertx.UniHelper#toUni:

public <T> io.smallrye.mutiny.Uni<T> withTransaction(io.vertx.sqlclient.TransactionPropagation txPropagation, Function<io.vertx.mutiny.sqlclient.SqlConnection, io.smallrye.mutiny.Uni<T>> function) { 
    return io.smallrye.mutiny.vertx.UniHelper.toUni(delegate.withTransaction(txPropagation, 
	 arg -> io.smallrye.mutiny.vertx.UniHelper.toFuture(function.apply(io.vertx.mutiny.sqlclient.SqlConnection.newInstance(arg)))
));}

Here is the implementation of the UniHelper#toUni:

public static <T> Uni<T> toUni(Future<T> future) {
    return Uni.createFrom().completionStage(future.toCompletionStage());
}

Javadoc for the Uni.createFrom().completionStage() method says: If the CompletionStage never completes (or failed), the produced Uni would not emit the item or failure events., so if function block within withTransaction emits a failure, it will be lost.

I'm no Mutiny expert, but this like a code generation bug to me. Original Vertx's methods' signatures looks almost identical to each other (despite the presence of the TransactionPropagation-typed argument of course):

default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> function, Handler<AsyncResult<@Nullable T>> handler)
default <T> void withTransaction(TransactionPropagation txPropagation, Function<SqlConnection, Future<@Nullable T>> function, Handler<AsyncResult<@Nullable T>> handler)

yet they are wrapped differently in Mutiny.

Ultranium avatar Jun 07 '23 16:06 Ultranium

I will have a look. It might be an issue in the mutiny generator (the code that transform the vertx api to mutiny api)

cescoffier avatar Jun 07 '23 16:06 cescoffier