bull icon indicating copy to clipboard operation
bull copied to clipboard

Possible bug in lifo and priority when using delay

Open viebig opened this issue 7 years ago • 8 comments

Description

When I use delayed jobs, lifo and priority options don't work.

Test code to reproduce

Code without delay

const Queue = require('bull');

const callQueue = new Queue('q21');
const concurrency = 10;

setTimeout(() => {
    console.log('starting')
    callQueue.process(concurrency, (job, done) => {
        const status = (Math.random() * 2 >= 1) ? "Answer" : "Busy";
        console.log(job.data, status);

        if (status === 'Busy') {
            job.redial = true;
            callQueue.add({ id: job.data.id, redial: true }, { lifo: true });
        }

        setTimeout(() => {
            done();
        }, 1000)
    });
}, 5000)

let x = 0;
let jobs = [];

for (x = 0; x < 1000; x++) {
    callQueue.add({ id: x, redial: false });
}

Output without delay

starting
{ id: 0, redial: false } 'Busy'
{ id: 1, redial: false } 'Busy'
{ id: 2, redial: false } 'Answer'
{ id: 3, redial: false } 'Answer'
{ id: 4, redial: false } 'Answer'
{ id: 5, redial: false } 'Answer'
{ id: 6, redial: false } 'Busy'
{ id: 7, redial: false } 'Answer'
{ id: 8, redial: false } 'Answer'
{ id: 9, redial: false } 'Answer'
{ id: 6, redial: true } 'Busy'
{ id: 1, redial: true } 'Busy'
{ id: 0, redial: true } 'Answer'
{ id: 10, redial: false } 'Answer'
{ id: 11, redial: false } 'Busy'
{ id: 12, redial: false } 'Busy'
{ id: 13, redial: false } 'Busy'
{ id: 14, redial: false } 'Busy'
{ id: 15, redial: false } 'Answer'
{ id: 16, redial: false } 'Answer'
{ id: 14, redial: true } 'Busy'
{ id: 13, redial: true } 'Answer'
{ id: 12, redial: true } 'Busy'
{ id: 11, redial: true } 'Busy'
{ id: 1, redial: true } 'Answer'

Code with delay

const Queue = require('bull');

const callQueue = new Queue('q21');
const concurrency = 10;

setTimeout(() => {
    console.log('starting')
    callQueue.process(concurrency, (job, done) => {
        const status = (Math.random() * 2 >= 1) ? "Answer" : "Busy";
        console.log(job.data, status);

        if (status === 'Busy') {
            job.redial = true;
            callQueue.add({ id: job.data.id, redial: true }, { lifo: true, delay: 1000 });
        }

        setTimeout(() => {
            done();
        }, 1000)
    });
}, 5000)

let x = 0;
let jobs = [];

for (x = 0; x < 1000; x++) {
    callQueue.add({ id: x, redial: false });
}

Output with delay

starting
{ id: 0, redial: false } 'Answer'
{ id: 1, redial: false } 'Answer'
{ id: 2, redial: false } 'Busy'
{ id: 3, redial: false } 'Busy'
{ id: 4, redial: false } 'Answer'
{ id: 5, redial: false } 'Answer'
{ id: 6, redial: false } 'Answer'
{ id: 7, redial: false } 'Answer'
{ id: 8, redial: false } 'Busy'
{ id: 9, redial: false } 'Busy'
{ id: 10, redial: false } 'Answer'
{ id: 11, redial: false } 'Busy'
{ id: 12, redial: false } 'Busy'
{ id: 13, redial: false } 'Busy'
{ id: 14, redial: false } 'Busy'
{ id: 15, redial: false } 'Answer'
{ id: 16, redial: false } 'Busy'
{ id: 17, redial: false } 'Busy'
{ id: 18, redial: false } 'Answer'
{ id: 19, redial: false } 'Busy'
{ id: 20, redial: false } 'Answer'
{ id: 21, redial: false } 'Answer'
{ id: 22, redial: false } 'Busy'
{ id: 23, redial: false } 'Busy'
{ id: 24, redial: false } 'Busy'
{ id: 25, redial: false } 'Answer'
{ id: 26, redial: false } 'Answer'
{ id: 27, redial: false } 'Answer'
{ id: 28, redial: false } 'Answer'
{ id: 29, redial: false } 'Busy'
{ id: 30, redial: false } 'Busy'
{ id: 31, redial: false } 'Busy'
{ id: 32, redial: false } 'Answer'
{ id: 33, redial: false } 'Busy'

Bull version

^3.3.10

Additional information

My workaround was using setTimeout to delay with lifo

  async scheduleInQueue(queueItem, campaign, delay) {
    setTimeout(() => {
      this.queues[queueItem.campaignId].add({ contact: queueItem, tree: campaign.tree }, {lifo: true });
    }, delay);
    return true;
  }

viebig avatar May 16 '18 14:05 viebig

Priority support for delayed jobs have been added in 3.4.0: https://github.com/OptimalBits/bull/blob/master/CHANGELOG.md#v340

manast avatar May 19 '18 08:05 manast

Thanks! Does it include lifo support?

viebig avatar May 21 '18 14:05 viebig

no, not yet.

manast avatar May 21 '18 16:05 manast

+1

nitzanav avatar Aug 26 '18 08:08 nitzanav

@manast @alexkh13 I am looking for guidance on how to PR delayed lifo jobs. I guess that I need to follow this commit of prioritized delayed, right? https://github.com/OptimalBits/bull/commit/e3c1775aa6b1541b623a58f411f62e6d6f2a71a2 but do I need to change any thing here: https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/job.js#L242 or is it just for retrying jobs, and it is unrelated.

nitzanav avatar Aug 26 '18 10:08 nitzanav

@nitzanav no, i don't see how this line is related.

i think you should focus on this block https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/commands/updateDelaySet-4.lua#L39

based on whether or not the job enables lifo you should LINSERT to right place

currently, it inserts it to the left of the "already inserted" priority jobs for example, if you insert a priority=2 job to the wait queue ZCOUNT will count all 1s and 2s which is 4 it will then take the length of the list and subtract 4 (6-4=2)

LEFT ||| 3 | 3 | **2** | 2 | 1 | 1 ||| RIGHT
               ^          

the ^ indicates where it will put it (to the left of index=2)

    • x * * is the index that will be selected by LINDEX and will be used by LINSERT

so what you basically want to do when lifo is enabled is to count (prioriy-1) and insert it to the left. so in the above example it would count only the 1s (6-2=4)

LEFT ||| 3 | 3 | 2 | 2 | **1** | 1 ||| RIGHT
                       ^          

if it didn't find any (priority-1) the RPUSH is still ok.

please, double check what i just said.

alexkh13 avatar Aug 26 '18 13:08 alexkh13

@alexkh13

Thanks a lot for the thorough input

I see now that there are two kind of tasks:

  1. prioritized delayed that respects lifo param
  2. delayed lifo that is not related to priority

It seems that you describe task 1

I am intetested in task 2.

maybe best to create two seperate issues and close this one, not sure what is the best.

Task 2 (delayed 2) is just to add condition that will do RPUSH in case of lifo around this line: https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/commands/updateDelaySet-4.lua#L39

then I need to follow you "prioritized delayed" commit to see how to propogate the parater to the lua script.

right?

BTW, if anyone have a remote developer that can make several PRs for me and do some Bull developmemt it will be nice. please contact me [email protected]

nitzanav avatar Aug 26 '18 13:08 nitzanav

I'm not sure I understand what are the 2 kinds you described. But, yes, you'll have to propagate the lifo param like "prioritized delayed" commit did with the priority param.

alexkh13 avatar Aug 26 '18 15:08 alexkh13