moleculer-db icon indicating copy to clipboard operation
moleculer-db copied to clipboard

Sequelize Transactions - not easy-peasy

Open MaciekLeks opened this issue 4 years ago • 2 comments

I'm thinking over adding transaction support to my service leveraging Sequelize Db Adapter. After going through the source code of moleculer-db and moleculer-db-adapter-sequelize I came to conclusion that the only option is to add a patch to moleculer-db (for transation argument to every method). Have enyone ever wonder of that?

MaciekLeks avatar May 07 '20 15:05 MaciekLeks

@maciekLeks How you have added patch. Please share i want to add transaction in my moleculer services. Please provice patch code

sunnygaikwad65 avatar Jun 21 '21 08:06 sunnygaikwad65

It's really hard for me right now because I've moved from moleculerjs to totally different area. So let me put my code here which worked for me at that time:

src/extends/TransactionalSequelizeDbAdapter.js

const SequelizeDbAdapter = require("moleculer-db-adapter-sequelize");

class TransactionalSequelizeDbAdapter extends SequelizeDbAdapter {
	constructor(...opts) {
		super(...opts);
	}

	insert(entity) {
		console.log("MLK TransactionalSequelizeDbAdapter");
		return super.insert(entity);
	}
}

module.exports = TransactionalSequelizeDbAdapter;

src/mixins/db.js

"use strict";

const DbService = require("moleculer-db");
//const SqlAdapter = require("moleculer-db-adapter-sequelize");
const TransactionalSequelizeDbAdapter = require("../extends/TransactionalSequelizeDbAdapter");
const auth = require("../../auth");
const isDocker = require("is-docker");
const { log } = require("../utils/helpers");

module.exports = {
	mixins: [DbService],
	/*adapter: new SqlAdapter(
		`postgres://${auth.pgUser}:${auth.pgPassword}@${
			isDocker() ? "postgres" : "localhost:5458"
		}/${auth.pgDB}`
	), */
	adapter: new TransactionalSequelizeDbAdapter(
		`${auth.pgDB}`,
		`${auth.pgUser}`,
		`${auth.pgPassword}`,
		{
			host: isDocker() ? "postgres" : "localhost",
			port: isDocker() ? undefined : 5458,
			dialect: "postgres",
			pool: {
				max: 5,
				min: 0,
				idle: 10000,
			},
		}
	),
	actions: {
		begin(ctx) {
			//TODO
			this.logger.info("begin - TODO - no implementation!!!");
		},

		commit(ctx) {
			//TODO
			this.logger.info("commit - TODO - no implementation!!!");
		},

		rollback(ctx) {
			//TODO
			this.logger.info("rollback - TODO - no implementation!!!");
		},

		execute: {
			params: {
				_exec: "object",
			},
			async handler(ctx) {
				const exec = ctx.params._exec;
				let res;
				try {
					const callType = exec._type;
					switch (callType) {
						case "create": {
							res = await this._create(ctx, exec._data);
							this.logger.info("db res:" + log(res));
							return res;
						}
						case "find": {
							const query = {
								limit: 1,
								query: {
									...exec._data,
								},
							};
							//this.logger.info("MY QUERY:" + log(query));
							res = await this._find(ctx, query);
							this.logger.info("db res:" + log(res));
							if (res) return res[0];
						}
					}
					// this.logger.info(
					// 	"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!DB RESULT:" + log(res)
					// );
				} catch (e) {
					this.logger.info("e: " + log(e));
				}

				return null;
			},
		},
	},
};

src/mixins/oracle.js

"use strict";

const oracledb = require("oracledb");
const { log } = require("../../src/utils/helpers");

/**
 * Service mixin to use oracledb
 *
 * TODO: stable version move out to the module moleculer-db-adapter-oracle
 *
 * @param opt.auth.oraUser
 * @param opt.auth.oraPassword
 * @param opt.auth.oraConnectionString
 * @param opt.pool use pool or singleton connection
 * @param opt.pool.min pool min
 * @param opt.pool.max pool max
 *
 * @name moleculer-db-adapter-oracle
 * @module Service
 */
module.exports = (opt) => ({
	name: "db-adapter-oracle",

	//mixins: [UUID],

	/* version: "v1", */
	meta: {
		scalable: true,
	},
	//dependencies: ["auth", "users"],

	methods: {
		async _execute(ctx) {
			const { _exec: exec, _uuid: uuid } = ctx.params;

			this.logger.info("PL/SQL call _spec:" + exec._spec);
			this.logger.info("\n\n\nUUIDs=" + this._uuids);

			const conn = opt.pool ? this._uuids[uuid] : this._conn; //always run begin() first

			//this.logger.info("CONN=" + log(conn) + " on:" + log(this._uuids));

			try {
				const result = await conn.execute(
					eval("`" + exec._spec + "`"), //TODO check if it's needed
					exec._data
				);

				console.log(result.outBinds);

				if (result.outBinds && result.outBinds[exec._out._error] == 0)
					return result.outBinds;
			} catch (e) {
				this.logger.error("e: " + log(e));
				//this._closePoolAndExit();
			}

			return null;
		},
	},

	/**
	 * Service started lifecycle event handler
	 */
	async started() {
		try {
			oracledb.autoCommit = false; //imperative way of commiting

			const attrs = {
				user: opt.auth.oraUser,
				password: opt.auth.oraPassword,
				connectString: opt.auth.oraConnectionString,
				...(opt.pool && { poolMin: opt.pool.min }),
				...(opt.pool && { poolMax: opt.pool.max }),
			};

			if (opt.pool) await oracledb.createPool(attrs);
			else this._conn = await oracledb.getConnection(attrs);
		} catch (e) {
			return Promise.reject(e);
		}
	},

	/**
	 * Service stopped lifecycle event handler
	 */
	async stopped() {
		if (opt.pool) {
			this.logger.info("this._uuids:" + log(this._uuids));
			if (this._uuids) {
				for (const [uuid, conn] of Object.keys(this._uuids)) {
					try {
						await conn.close();
					} catch (err) {
						this.logger.error(`Error while closing the conn ${uuid}`);
					}
				}
			}
			try {
				this.logger.info("Pool closing...");
				await oracledb.getPool().close(10);
				this.logger.info("Pool closed.");
			} catch (err) {
				this.logger.error(err.message);
				Promise.reject(err);
			}
		} else {
			try {
				this.logger.info("Connection closing...");
				if (this._conn) await this._conn.close();
				this.logger.info("Connection closed.");
			} catch (err) {
				this.logger.error("Error while closing this._conn: ${err}");
				Promise.reject(err);
			}
		}
	},

	actions: {
		execute(ctx) {
			return this._execute(ctx);
		},
		/**
			Begin transaction using external uuid (given by the orchestrator srv).
		 */
		async begin(ctx) {
			console.log("begin started...");
			if (opt.pool) {
				const { _uuid: uuid } = ctx.params;
				const uuids = this._uuids || (this._uuids = {}); //TODO Redis!!!
				if (!uuids[uuid]) uuids[uuid] = await oracledb.getConnection(); //start a new one only if not exists

				//console.log("begin:" + log(this._uuids));
			}
		},
		rollback(ctx) {
			this.logger.error("ROLLBAK");

			if (opt.pool) return this._uuids[ctx.params._uuid].rollback();
			else return this._conn.rollback();
		},
		commit(ctx) {
			this.logger.info("COMMIT!!!");
			if (opt.pool) return this._uuids[ctx.params._uuid].commit();
			else return this._conn.commit();
		},
	},

	events: {
		"$node.connected"({ node }) {
			this.logger.info(`Node '${node.id}' is connected!`);
		},
		"remote.order.create"(msg) {
			this.logger.info("remote.order.create msg:" + log(msg));
		},
	},
});

I don't remember anything but you probably can find here something usefull.

MaciekLeks avatar Jun 21 '21 08:06 MaciekLeks