jackrabbit
jackrabbit copied to clipboard
Sugar syntax for worker queues with promises
I'd like to suggest two additional methods for rpc calls (When global.Promise exists) ...
...
var queue = exchange.queue({ name: 'rpc_queue', prefetch: 1, durable: false });
// queue.work returns a promise -- arguments serialized as array
var request = queue.work(...arguments);
...
// worker accepts a method that returns a promise
queue.worker(doWork);
function doWork(defer, ...arguments) { //arguments spread back
return Promise...
]
if the promise resolves, or rejects the defer object that is passed in, then it should not acknowledge the message, which will in effect re-run the message on timeout.
It would work best, if the input arguments were all passed through, serialized then deserialzed as an array, in this way, local methods that return a promise can be replaced with rpc calls to do the same interface... A Promise based workflow would look like:
var rpc = exchange.queue({ name: 'rpc_queue', prefetch: 1, durable: false });
...
//client
rpc.work('a', {'some':'object'}).then(
function(result) {
//successful result
},
function(err) {
//error either in call, or from remote
}
);
...
//server
rpc.worker(function(defer, strInput, objinput) {
return new Promise(function(resolve,reject){
//do some async work, etc
return someAsyncWork(strInput)
.then(moreAsyncWork.bind(null, objinput))
.then(function(){
return 'success';
});
});
})
With the benefit of async and await support fia Babel or TypeScript:
var result = rpc.work('a', {'some':'object'});
...
rpc.worker(async function(defer, strinput, objinput) {
await someAsyncWork(strinput);
await moreAsyncWork(objinput);
return 'success';
})
As you can see, with the final async example, this simplifies things a LOT
For returning errors, would suggest ensuring the message and stack properties are included.
// ES6 syntax...
import stringify from 'json-stringify-safe';
...
function successHandler(result) {
return stringify({response:'resolve',resolve:result});
}
function errorHandler(err) {
//err.message and stack are inherited properties, they don't serialize normally
if (err instanceof Error) err = {...err, message:err.message, stack:err.stack};
//serialize rejection
return stringify({response:'reject',reject:err});
}
@hunterloftis I could make this into a PR request if you'd be interested... the at the end of the queue object constructor function, it could pass itself onto a module method after a test... if (global && global.Promise) return promiseExtensiond(this)' or something like that.
This seems interesting and seems in line with the philosophy of super simplicity.
Did you manage to create this as a PR or are you still interested in doing it?