vertx-jooq
vertx-jooq copied to clipboard
Support for Vert.x Transactions
Based on this, I've created the following helper and extended the async executor to support transactions.
It's not nice (because one part is in the queryexecutor (java) and one part is in the calling service code (kotlin) but I hope you get the idea and you can make something out of it. Not sure it's a good idea to expose the connection but I support the saying: "We are consenting adults".
I don't have the time to make it nice and provide a PR though but maybe this helps somebody out there.
private fun <T> executeInTransaction(code: (FixAsyncClassicGenericQueryExecutor, SQLConnection) -> Future<T>): Future<Void> {
return queryExecutor.rawConnection.compose {
val conn = it
queryExecutor.startTx(it).compose {
code(queryExecutor, conn).compose { Future.succeededFuture<Void>() }
}.recover {
logger.error { "failed! rolling back" }
queryExecutor.rollbackCloseFail(conn, it)
}.compose {
queryExecutor.endTxAndClose(conn)
}
}
}
fun createSomething(foo: JsonObject, bar: JsonObject,
resultHandler: Handler<AsyncResult<Void>>) {
executeInTransaction { executor, conn ->
val q1 = DSL.using(configuration).insertQuery(FOO)
a1.addRecord(FooRecord(foo))
executor.execute(conn, q1)
.compose {
val q2 = DSL.using(configuration).insertQuery(BAR)
q2.addRecord(BarRecord(bar))
executor.execute(conn, q1)
}
}.setHandler(resultHandler)
}
public Future<SQLConnection> getRawConnection() {
return getConnection();
}
public Future<UpdateResult> execute(SQLConnection conn, Query query) {
try {
log(query);
Future<UpdateResult> future = Future.future();
conn.updateWithParams(
query.getSQL(),
getBindValues(query),
future.completer());
return future;
} catch (Throwable e) {
return Future.failedFuture(e);
}
}
public Future<Void> startTx(SQLConnection conn) {
Future<Void> future = Future.future();
conn.setAutoCommit(false, future.completer());
return future;
}
public Future<Void> endTxAndClose(SQLConnection conn) {
Future<Void> future = Future.future();
conn.commit(future.completer());
return future.compose(f -> {
Future<Void> closeFuture = Future.future();
conn.close(closeFuture.completer());
return closeFuture;
});
}
public Future<Void> rollback(SQLConnection conn) {
Future<Void> future = Future.future();
conn.rollback(future.completer());
return future;
}
public Future<Void> rollbackCloseFail(SQLConnection conn, Throwable t) {
return rollback(conn)
.compose(v -> endTxAndClose(conn))
.compose(v -> Future.failedFuture(t));
}