bull icon indicating copy to clipboard operation
bull copied to clipboard

Making it work in a worker

Open jes-carr opened this issue 10 years ago • 4 comments

I'm trying to create a queue in a worker process that gets executed once per day. When is executed, it would check the jobs overdue in the queue, process them, and then finish the execution. Jobs need to be added with several days of delay.

At the moment when I add the delayed job, the queue waits for the first job to be executed, which for this use case is not good.

Also not sure when to call the close() function so the process can end gracefully.

Is there any way to make bull work for this use case?

jes-carr avatar Apr 13 '15 17:04 jes-carr

I am not sure that I understand what you mean. If you add 2 delayed jobs one after each other they will be processed directly, put on the delay set and executed when the delay time has passed. Aren't you experiencing this behaviour?

manast avatar Apr 13 '15 19:04 manast

In other words, a delayed job does not delay the whole queue, it only delays that job in particular...

manast avatar Apr 13 '15 19:04 manast

Sorry I think I wasn't clear. I prepared an example with my test code so far so I can explain it more clearly.

This worker process called add-jobs.js gets called once, and then the process close. All good.

var Promise, Queue, job1, job2, job3, testQueue;

Queue = require('bull');

Promise = require('bluebird');

testQueue = Queue('test queue');

job1 = testQueue.add({
  num: 3
}, {
  delay: 1000 * 60
});

job2 = testQueue.add({
  num: 1
}, {
  delay: 1000 * 20
});

job3 = testQueue.add({
  num: 2
}, {
  delay: 1000 * 40
});

Promise.all([job1, job2, job3]).then(function() {
  return testQueue.close().then(function() {
    return console.log('closed');
  });
});

And this is a worker process called worker.js, supposed to be called at regular intervals, let's say every 5 seconds. If there are no jobs to process at that moment, the process should close.

var Promise, Queue, closing, countAndClose, testQueue;

Queue = require('bull');

Promise = require('bluebird');

testQueue = Queue('test queue');

testQueue.process(function(job, done) {
  console.log('processing job id', job.jobId, 'num', job.data.num);
  return done();
});

closing = false;

countAndClose = function() {
  return Promise.all([testQueue.getWaiting(), testQueue.getActive(), testQueue.getDelayed()]).spread(function(waiting, active, delayed) {
    console.log('active:', active.length);
    console.log('waiting:', waiting.length);
    console.log('delayed:', delayed.length);
    return testQueue.close().then(function() {
      return console.log('closed');
    });
  })["catch"](function(err) {
    return console.log(err);
  });
};

testQueue.on('ready', function() {
  console.log('ready');
  return countAndClose();
});

testQueue.on('completed', function(job) {
  console.log('job', job.jobId, 'completed');
  return countAndClose();
});

testQueue.on('failed', function(job, err) {
  console.log('job', job.jobId, 'failed', err);
  return countAndClose();
});

When I execute worker.js after executing add-jobs.js the jobs get added in the queue one as active, other as waiting, and other as delayed (a bit confusing, I would expect them to be delayed). Next time I execute it, the 3 jobs are moved to delayed, but the process waits for the next job's delay before exiting, even after the close() promise being resolved.

I guess this works just fine if you have the process running all the time, but is there a way to close the queue and process if there are no jobs to run right now and only scheduled jobs?

jes-carr avatar Apr 13 '15 21:04 jes-carr

testQueue = Queue('test queue');

Not sure if related, but I had trouble when the queue name had spaces...

julianjm avatar Oct 17 '18 18:10 julianjm