bull icon indicating copy to clipboard operation
bull copied to clipboard

Jobs are not respecting priority if the jobs backoff is not specified or delay is 0

Open sharma-shweta opened this issue 5 years ago • 4 comments

Description

Bull jobs are not following the priorities when the options have multiple attempts but 0 delay or no backoff.

Minimal, Working Test code to reproduce the issue.

Jobs setup.

const queue = new Queue('test-jobs', {
    defaultJobOptions: {
        priority: 100,
        removeOnComplete: true
    },
    settings: {
        lockDuration: 5 * 60 * 1000, // 5mins
        stalledInterval: 5 * 60 * 1000, // 5mins
        maxStalledCount: 2
    },
    ...REDIS_CREDENTIALS
});

queue.process('fetch-test-data', 2, async job => {
    console.log(`[JOB: ${job.id}] fetching test data for: ${job.data.id}`);
    await Promise.delay(1000);
    return Promise.reject(new Error('ERROR'));
});

class TestJobsService {
    fetchTestData(id, priority, delay = 5000) {
        return queue.add('fetch-test-data', {
            id: id,
        }, {
            priority: priority,
            attempts: 2,
            backoff: {
                type: 'fixed',
                delay: delay
            }
        })
    }
}

testJobsService = new TestJobsService();
module.exports = testJobsService;

Run the jobs using ->

Promise.all([
    testJobsService.fetchTestData('a1', 1, 0),
    testJobsService.fetchTestData('b1', 1, 0),
    testJobsService.fetchTestData('a2', 2, 0),
    testJobsService.fetchTestData('b2', 2, 0),
    testJobsService.fetchTestData('a3', 3, 0),
    testJobsService.fetchTestData('b3', 3, 0),
    testJobsService.fetchTestData('a4', 4, 0),
    testJobsService.fetchTestData('b4', 4, 0),
    testJobsService.fetchTestData('a5', 5, 0),
    testJobsService.fetchTestData('b5', 5, 0),
    testJobsService.fetchTestData('a6', 6, 0),
    testJobsService.fetchTestData('b6', 6, 0),
]).then(() => testJobsService.listen())

This outputs ->

5:14:18 PM - info: [JOB: 37] fetching test data for: a1
5:14:19 PM - info: [JOB: 38] fetching test data for: b1
5:14:22 PM - info: [JOB: 39] fetching test data for: a2
5:14:23 PM - info: [JOB: 40] fetching test data for: b2
5:14:24 PM - info: [JOB: 41] fetching test data for: a3
5:14:26 PM - info: [JOB: 42] fetching test data for: b3
5:14:27 PM - info: [JOB: 43] fetching test data for: a4
5:14:28 PM - info: [JOB: 44] fetching test data for: b4
5:14:29 PM - info: [JOB: 45] fetching test data for: a5
5:14:30 PM - info: [JOB: 46] fetching test data for: b5
5:14:31 PM - info: [JOB: 47] fetching test data for: a6
5:14:32 PM - info: [JOB: 48] fetching test data for: b6
5:14:33 PM - info: [JOB: 37] fetching test data for: a1
5:14:34 PM - info: [JOB: 38] fetching test data for: b1
5:14:35 PM - info: [JOB: 39] fetching test data for: a2
5:14:36 PM - info: [JOB: 40] fetching test data for: b2
5:14:37 PM - info: [JOB: 41] fetching test data for: a3
5:14:38 PM - info: [JOB: 42] fetching test data for: b3
5:14:39 PM - info: [JOB: 43] fetching test data for: a4
5:14:40 PM - info: [JOB: 44] fetching test data for: b4
5:14:41 PM - info: [JOB: 45] fetching test data for: a5
5:14:42 PM - info: [JOB: 46] fetching test data for: b5
5:14:43 PM - info: [JOB: 47] fetching test data for: a6
5:14:44 PM - info: [JOB: 48] fetching test data for: b6

Bull version

3.11.0

Redis version

Redis server v=4.0.7

Additional information

Running the following code works correctly ->

Promise.all([
    testJobsService.fetchTestData('a1', 1, 1000),
    testJobsService.fetchTestData('b1', 1, 1000),
    testJobsService.fetchTestData('a2', 2, 1000),
    testJobsService.fetchTestData('b2', 2, 1000),
    testJobsService.fetchTestData('a3', 3, 1000),
    testJobsService.fetchTestData('b3', 3, 1000),
    testJobsService.fetchTestData('a4', 4, 1000),
    testJobsService.fetchTestData('b4', 4, 1000),
    testJobsService.fetchTestData('a5', 5, 1000),
    testJobsService.fetchTestData('b5', 5, 1000),
    testJobsService.fetchTestData('a6', 6, 1000),
    testJobsService.fetchTestData('b6', 6, 1000),
]).then(() => testJobsService.listen())

Expected Output ->

5:11:21 PM - info: [JOB: 25] fetching test data for: a1
5:11:22 PM - info: [JOB: 26] fetching test data for: b1
5:11:24 PM - info: [JOB: 25] fetching test data for: a1
5:11:26 PM - info: [JOB: 26] fetching test data for: b1
5:11:27 PM - info: [JOB: 27] fetching test data for: a2
5:11:28 PM - info: [JOB: 28] fetching test data for: b2
5:11:29 PM - info: [JOB: 27] fetching test data for: a2
5:11:30 PM - info: [JOB: 28] fetching test data for: b2
5:11:31 PM - info: [JOB: 29] fetching test data for: a3
5:11:32 PM - info: [JOB: 30] fetching test data for: b3
5:11:33 PM - info: [JOB: 29] fetching test data for: a3
5:11:34 PM - info: [JOB: 30] fetching test data for: b3
......

sharma-shweta avatar Oct 14 '19 09:10 sharma-shweta

Hi! Both your examples give the same result on my environment, jobs are not getting executed by priorities when they're repeated.

When job fails and positive delay is set, Bull puts jobs to delayed queue (Redis set), this mechanism works exactly as if you just add delayed job. Particular job is moved from delayed to waiting set when its delay is timed out. Job priority is preserved in this case.

If delay is set to 0, Bull uses optimization check to put jobs directly to the waiting set. And I can confirm in this case priorities do not respected.

But, anyway, regarding your case. Am I understand you correctly you want to pause entire Bull queue until there're jobs with higher priority exist in delayed set due to previous failures?

stansv avatar Oct 14 '19 10:10 stansv

Hi @stansv

The issue is that - we want the priorities to be respected even when the job goes from fail to waiting.

I would expect the same behaviour for any job added to waiting - whether it comes from delayed or failed.

sharma-shweta avatar Oct 15 '19 06:10 sharma-shweta

@sharma-shweta seems like a reasonable request. The way it works now is that as soon as a delayed job awakes it is pushed at the tip of the queue so that it gets executed as soon as possible. I will label this as an enhancement proposal.

manast avatar Oct 15 '19 11:10 manast

Follow up question to this @manast, lets say there are a 100 delayed jobs with different priorities that awaken at the same time. In what order will they be added to the queue? I.e. will the priorities be followed for these 100 jobs?

swayam18 avatar Oct 18 '19 10:10 swayam18