quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Difficult to use transactions with blocking code and reactive SQL drivers

Open kenkania opened this issue 2 years ago • 4 comments

Description

We are using the reactive drivers but in a blocking fashion, ie pgPool.query("SELECT * FROM mytable").executeAndAwait(). IIUC, this should be very performant when using virtual threads (since only the JDBC/non-reactive drivers pin the carrier thread).

With non-reactive drivers, you can easily add a JTA annotation and begin/end a transaction in a call stack safely. With reactive drivers, it appears there are at least two common transaction options (below), but neither supports using blocking code within the transaction. Can this be improved, or perhaps am I missing some other easy alternative?

1) PgPool#withTransaction

(https://quarkus.io/guides/reactive-sql-clients#transactions)

The main problem here is that: a) you have to pass the allocated SqlConnection throughout the entire stack b) the passed-in function has to return a Uni (or future) and cannot block

vertx-sql-client has recently added ability to specify TransactionPropagation.CONTEXT to this method, which will store/lookup a SqlConnection in the Vertx context. This somewhat improves a), although it is difficult (or at least non-obvious) to test code that uses this in quarkus unit tests (complains with java.lang.NullPointerException: Cannot invoke "io.vertx.core.impl.ContextInternal.getLocal(Object)" because "context" is null). Regardless, even if that is resolved, b) is still an issue.

2) @Transactional on Uni-returning methods

(https://quarkus.io/guides/transaction#reactive-extensions) (https://quarkus.io/guides/context-propagation#usage-example-with-mutiny)

Implementation ideas

No response

kenkania avatar Jun 16 '23 19:06 kenkania

/cc @tsegismont (reactive-sql-clients), @vietj (reactive-sql-clients)

quarkus-bot[bot] avatar Jun 16 '23 19:06 quarkus-bot[bot]

Perhaps related to https://github.com/quarkusio/quarkus/issues/34101

kenkania avatar Jun 16 '23 19:06 kenkania

this should be very performant when using virtual threads (since only the JDBC/non-reactive drivers pin the carrier thread)

cc @cescoffier

geoand avatar Jun 19 '23 08:06 geoand

any news?

RobertoUa avatar Jun 26 '24 14:06 RobertoUa

@kenkania @RobertoUa as an alternative, you could get a connection from the pool and manage the transaction manually:

    @Inject
    PgPool client;

    @RunOnVirtualThread
    void doSomethingInTransaction() {
        SqlConnection conn = null;
        try {
            conn = client.getConnectionAndAwait();
            Transaction transaction = conn.beginAndAwait();

            doStuffTheBlockingWayOnVirtualThread(conn);

            // Everything went well
            transaction.commitAndAwait();

        } catch (Throwable t) {
            // Something went wrong
            if (conn != null) {
                conn.closeAndForget();
            }
        }
    }

tsegismont avatar Aug 21 '24 08:08 tsegismont

I solved this issue in kotlin using propagated connections

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun <T> PgPool.transaction(block: suspend SqlConnection.() -> T): T = coroutineScope {
    if (io.vertx.core.Vertx.currentContext() == null) { // usually there's no context when called directly from tests
        Log.info("Transaction called not from a vertx thread. Creating new context...")
        val vertx = VertxCoreRecorder.getVertx().get()
        val context = VertxContext.getOrCreateDuplicatedContext(vertx)
        context.dispatcher().invoke {
            [email protected](TransactionPropagation.CONTEXT) { con ->
                async(context.dispatcher()) { con.block() }.asUni()
            }.awaitSuspending()
        }
    } else {
        [email protected](TransactionPropagation.CONTEXT) { connection ->
            async { connection.block() }.asUni()
        }.awaitSuspending()
    }
}

fun getPropagatedConnection(): SqlConnection? =
    io.vertx.core.Vertx.currentContext()
        ?.getLocal<io.vertx.sqlclient.SqlConnection>(PoolImpl.PROPAGATABLE_CONNECTION)
        ?.let { SqlConnection(it) }

and then to use it:

class MyPgPool(pool: io.vertx.pgclient.PgPool) : PgPool(pool) {
    override fun preparedQuery(@Language("sql") sql: String?): PreparedQuery<RowSet<Row>> {
        return getPropagatedConnection()?.preparedQuery(sql) ?: super.preparedQuery(sql)
    }
}

Now I don't even need to pass a connection around and I can use suspend functions inside. I can just do

pgPool.transactional {
 query1() //this is kotlin suspend function which calls 'pgPool.preparedQuery()...awaitSuspending()'
 query2()
}

RobertoUa avatar Aug 21 '24 10:08 RobertoUa

See @tsegismont example.

cescoffier avatar Aug 21 '24 13:08 cescoffier

@RobertoUa thanks for sharing!

tsegismont avatar Aug 21 '24 14:08 tsegismont