vertx-jooq icon indicating copy to clipboard operation
vertx-jooq copied to clipboard

Support for Vert.x Transactions

Open m1schka opened this issue 6 years ago • 0 comments

Based on this, I've created the following helper and extended the async executor to support transactions.

It's not nice (because one part is in the queryexecutor (java) and one part is in the calling service code (kotlin) but I hope you get the idea and you can make something out of it. Not sure it's a good idea to expose the connection but I support the saying: "We are consenting adults".

I don't have the time to make it nice and provide a PR though but maybe this helps somebody out there.

private fun <T> executeInTransaction(code: (FixAsyncClassicGenericQueryExecutor, SQLConnection) -> Future<T>): Future<Void> {
        return queryExecutor.rawConnection.compose {
            val conn = it
            queryExecutor.startTx(it).compose {
                code(queryExecutor, conn).compose { Future.succeededFuture<Void>() }
            }.recover {
                logger.error { "failed! rolling back" }
                queryExecutor.rollbackCloseFail(conn, it)
            }.compose {
                queryExecutor.endTxAndClose(conn)
            }
        }
    }

fun createSomething(foo: JsonObject, bar: JsonObject,
                                                  resultHandler: Handler<AsyncResult<Void>>) {
        executeInTransaction { executor, conn ->
            val q1 = DSL.using(configuration).insertQuery(FOO)
            a1.addRecord(FooRecord(foo))
            executor.execute(conn, q1)
                    .compose {

                        val q2 = DSL.using(configuration).insertQuery(BAR)
                        q2.addRecord(BarRecord(bar))
                        executor.execute(conn, q1)

                    }

        }.setHandler(resultHandler)
    }
public Future<SQLConnection> getRawConnection() {
        return getConnection();
    }

    public Future<UpdateResult> execute(SQLConnection conn, Query query) {
        try {
            log(query);
            Future<UpdateResult> future = Future.future();
            conn.updateWithParams(
                    query.getSQL(),
                    getBindValues(query),
                    future.completer());
            return future;
        } catch (Throwable e) {
            return Future.failedFuture(e);
        }
    }


    public Future<Void> startTx(SQLConnection conn) {
        Future<Void> future = Future.future();
        conn.setAutoCommit(false, future.completer());
        return future;
    }

    public Future<Void> endTxAndClose(SQLConnection conn) {
        Future<Void> future = Future.future();
        conn.commit(future.completer());
        return future.compose(f -> {
            Future<Void> closeFuture = Future.future();
            conn.close(closeFuture.completer());
            return closeFuture;
        });
    }

    public Future<Void> rollback(SQLConnection conn) {
        Future<Void> future = Future.future();
        conn.rollback(future.completer());
        return future;
    }

    public Future<Void> rollbackCloseFail(SQLConnection conn, Throwable t) {
        return rollback(conn)
                .compose(v -> endTxAndClose(conn))
                .compose(v -> Future.failedFuture(t));
    }

m1schka avatar Jul 02 '18 15:07 m1schka