jackrabbit icon indicating copy to clipboard operation
jackrabbit copied to clipboard

Sugar syntax for worker queues with promises

Open tracker1 opened this issue 9 years ago • 3 comments
trafficstars

I'd like to suggest two additional methods for rpc calls (When global.Promise exists) ...

...
var queue = exchange.queue({ name: 'rpc_queue', prefetch: 1, durable: false });

// queue.work returns a promise -- arguments serialized as array
var request = queue.work(...arguments);
...
// worker accepts a method that returns a promise
queue.worker(doWork);

function doWork(defer, ...arguments) { //arguments spread back
  return Promise...
]

if the promise resolves, or rejects the defer object that is passed in, then it should not acknowledge the message, which will in effect re-run the message on timeout.


It would work best, if the input arguments were all passed through, serialized then deserialzed as an array, in this way, local methods that return a promise can be replaced with rpc calls to do the same interface... A Promise based workflow would look like:

var rpc = exchange.queue({ name: 'rpc_queue', prefetch: 1, durable: false });

...
//client
rpc.work('a', {'some':'object'}).then(
    function(result) {
      //successful result
    },
    function(err) {
      //error either in call, or from remote
    }
);
...
//server
rpc.worker(function(defer, strInput, objinput) {
  return new Promise(function(resolve,reject){
    //do some async work, etc
    return someAsyncWork(strInput)
      .then(moreAsyncWork.bind(null, objinput))
      .then(function(){
        return 'success';
      });
  });
})

With the benefit of async and await support fia Babel or TypeScript:

var result = rpc.work('a', {'some':'object'});
...
rpc.worker(async function(defer, strinput, objinput) {
  await someAsyncWork(strinput);
  await moreAsyncWork(objinput);
  return 'success';
})

As you can see, with the final async example, this simplifies things a LOT

tracker1 avatar Jan 22 '16 21:01 tracker1

For returning errors, would suggest ensuring the message and stack properties are included.

// ES6 syntax...
import stringify from 'json-stringify-safe';
...
function successHandler(result) {
  return stringify({response:'resolve',resolve:result});
}

function errorHandler(err) {
  //err.message and stack are inherited properties, they don't serialize normally
  if (err instanceof Error) err = {...err, message:err.message, stack:err.stack};

  //serialize rejection
  return stringify({response:'reject',reject:err});
}

tracker1 avatar Jan 22 '16 22:01 tracker1

@hunterloftis I could make this into a PR request if you'd be interested... the at the end of the queue object constructor function, it could pass itself onto a module method after a test... if (global && global.Promise) return promiseExtensiond(this)' or something like that.

tracker1 avatar Jan 22 '16 22:01 tracker1

This seems interesting and seems in line with the philosophy of super simplicity.

Did you manage to create this as a PR or are you still interested in doing it?

matmar10 avatar Jun 22 '18 09:06 matmar10