moleculer-db
moleculer-db copied to clipboard
Sequelize Transactions - not easy-peasy
I'm thinking over adding transaction support to my service leveraging Sequelize Db Adapter. After going through the source code of moleculer-db and moleculer-db-adapter-sequelize I came to conclusion that the only option is to add a patch to moleculer-db (for transation argument to every method). Have enyone ever wonder of that?
@maciekLeks How you have added patch. Please share i want to add transaction in my moleculer services. Please provice patch code
It's really hard for me right now because I've moved from moleculerjs to totally different area. So let me put my code here which worked for me at that time:
src/extends/TransactionalSequelizeDbAdapter.js
const SequelizeDbAdapter = require("moleculer-db-adapter-sequelize");
class TransactionalSequelizeDbAdapter extends SequelizeDbAdapter {
constructor(...opts) {
super(...opts);
}
insert(entity) {
console.log("MLK TransactionalSequelizeDbAdapter");
return super.insert(entity);
}
}
module.exports = TransactionalSequelizeDbAdapter;
src/mixins/db.js
"use strict";
const DbService = require("moleculer-db");
//const SqlAdapter = require("moleculer-db-adapter-sequelize");
const TransactionalSequelizeDbAdapter = require("../extends/TransactionalSequelizeDbAdapter");
const auth = require("../../auth");
const isDocker = require("is-docker");
const { log } = require("../utils/helpers");
module.exports = {
mixins: [DbService],
/*adapter: new SqlAdapter(
`postgres://${auth.pgUser}:${auth.pgPassword}@${
isDocker() ? "postgres" : "localhost:5458"
}/${auth.pgDB}`
), */
adapter: new TransactionalSequelizeDbAdapter(
`${auth.pgDB}`,
`${auth.pgUser}`,
`${auth.pgPassword}`,
{
host: isDocker() ? "postgres" : "localhost",
port: isDocker() ? undefined : 5458,
dialect: "postgres",
pool: {
max: 5,
min: 0,
idle: 10000,
},
}
),
actions: {
begin(ctx) {
//TODO
this.logger.info("begin - TODO - no implementation!!!");
},
commit(ctx) {
//TODO
this.logger.info("commit - TODO - no implementation!!!");
},
rollback(ctx) {
//TODO
this.logger.info("rollback - TODO - no implementation!!!");
},
execute: {
params: {
_exec: "object",
},
async handler(ctx) {
const exec = ctx.params._exec;
let res;
try {
const callType = exec._type;
switch (callType) {
case "create": {
res = await this._create(ctx, exec._data);
this.logger.info("db res:" + log(res));
return res;
}
case "find": {
const query = {
limit: 1,
query: {
...exec._data,
},
};
//this.logger.info("MY QUERY:" + log(query));
res = await this._find(ctx, query);
this.logger.info("db res:" + log(res));
if (res) return res[0];
}
}
// this.logger.info(
// "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!DB RESULT:" + log(res)
// );
} catch (e) {
this.logger.info("e: " + log(e));
}
return null;
},
},
},
};
src/mixins/oracle.js
"use strict";
const oracledb = require("oracledb");
const { log } = require("../../src/utils/helpers");
/**
* Service mixin to use oracledb
*
* TODO: stable version move out to the module moleculer-db-adapter-oracle
*
* @param opt.auth.oraUser
* @param opt.auth.oraPassword
* @param opt.auth.oraConnectionString
* @param opt.pool use pool or singleton connection
* @param opt.pool.min pool min
* @param opt.pool.max pool max
*
* @name moleculer-db-adapter-oracle
* @module Service
*/
module.exports = (opt) => ({
name: "db-adapter-oracle",
//mixins: [UUID],
/* version: "v1", */
meta: {
scalable: true,
},
//dependencies: ["auth", "users"],
methods: {
async _execute(ctx) {
const { _exec: exec, _uuid: uuid } = ctx.params;
this.logger.info("PL/SQL call _spec:" + exec._spec);
this.logger.info("\n\n\nUUIDs=" + this._uuids);
const conn = opt.pool ? this._uuids[uuid] : this._conn; //always run begin() first
//this.logger.info("CONN=" + log(conn) + " on:" + log(this._uuids));
try {
const result = await conn.execute(
eval("`" + exec._spec + "`"), //TODO check if it's needed
exec._data
);
console.log(result.outBinds);
if (result.outBinds && result.outBinds[exec._out._error] == 0)
return result.outBinds;
} catch (e) {
this.logger.error("e: " + log(e));
//this._closePoolAndExit();
}
return null;
},
},
/**
* Service started lifecycle event handler
*/
async started() {
try {
oracledb.autoCommit = false; //imperative way of commiting
const attrs = {
user: opt.auth.oraUser,
password: opt.auth.oraPassword,
connectString: opt.auth.oraConnectionString,
...(opt.pool && { poolMin: opt.pool.min }),
...(opt.pool && { poolMax: opt.pool.max }),
};
if (opt.pool) await oracledb.createPool(attrs);
else this._conn = await oracledb.getConnection(attrs);
} catch (e) {
return Promise.reject(e);
}
},
/**
* Service stopped lifecycle event handler
*/
async stopped() {
if (opt.pool) {
this.logger.info("this._uuids:" + log(this._uuids));
if (this._uuids) {
for (const [uuid, conn] of Object.keys(this._uuids)) {
try {
await conn.close();
} catch (err) {
this.logger.error(`Error while closing the conn ${uuid}`);
}
}
}
try {
this.logger.info("Pool closing...");
await oracledb.getPool().close(10);
this.logger.info("Pool closed.");
} catch (err) {
this.logger.error(err.message);
Promise.reject(err);
}
} else {
try {
this.logger.info("Connection closing...");
if (this._conn) await this._conn.close();
this.logger.info("Connection closed.");
} catch (err) {
this.logger.error("Error while closing this._conn: ${err}");
Promise.reject(err);
}
}
},
actions: {
execute(ctx) {
return this._execute(ctx);
},
/**
Begin transaction using external uuid (given by the orchestrator srv).
*/
async begin(ctx) {
console.log("begin started...");
if (opt.pool) {
const { _uuid: uuid } = ctx.params;
const uuids = this._uuids || (this._uuids = {}); //TODO Redis!!!
if (!uuids[uuid]) uuids[uuid] = await oracledb.getConnection(); //start a new one only if not exists
//console.log("begin:" + log(this._uuids));
}
},
rollback(ctx) {
this.logger.error("ROLLBAK");
if (opt.pool) return this._uuids[ctx.params._uuid].rollback();
else return this._conn.rollback();
},
commit(ctx) {
this.logger.info("COMMIT!!!");
if (opt.pool) return this._uuids[ctx.params._uuid].commit();
else return this._conn.commit();
},
},
events: {
"$node.connected"({ node }) {
this.logger.info(`Node '${node.id}' is connected!`);
},
"remote.order.create"(msg) {
this.logger.info("remote.order.create msg:" + log(msg));
},
},
});
I don't remember anything but you probably can find here something usefull.