bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Bug] (Pro): Group concurrency 1 doesn't work. Waiting group jobs not listed.

Open pjamrozowicz opened this issue 8 months ago • 6 comments

Version

v7.32.2

Platform

NodeJS

What happened?

I run into two issues when using BullMQ Pro and its groups feature.

  1. Setting concurrency 1 for a group does not work. It runs max 2 jobs at a time, not 1. I attached a simplified code in the "How to reproduce" section.

  2. When there are jobs waiting in virtual group queues, they're not listed when using the queue.getJobs() method.

const jobs = await queue.getJobs();

console.log(jobs);

  /*
  returns:
  { id: '17', status: 'active' }
  { id: '16', status: 'active' }
  { id: '14', status: 'completed' }
  { id: '15', status: 'completed' }
   */

console.log(await queue.getJobs(['waiting']));
// prints []

console.log(await queue.getJobState('18'));
// prints 'waiting'

How to reproduce.

import { QueuePro } from '@taskforcesh/bullmq-pro';

const queue = new QueuePro();
const jobName = 'some-job';

const worker = new WorkerPro(jobName, ..., {
  concurrency: 10,
  group: {
    concurrency: 1,
  },
});

const groupId = 'this-is-a-test';

await queue.setGroupConcurrency(groupId, 1)

await queue.add(jobName, {data: 1}, { attempts: 100, group: { id: groupId } });
await queue.add(jobName, {data: 2}, { attempts: 100, group: { id: groupId } });
await queue.add(jobName, {data: 3}, { attempts: 100, group: { id: groupId } });

console.log(await queue.getActiveCount());
// prints 2, expected 1

Relevant log output


Code of Conduct

  • [x] I agree to follow this project's Code of Conduct

pjamrozowicz avatar Apr 10 '25 19:04 pjamrozowicz

Thanks for the issue. Regarding the getters, for groups you must use the special groups getters, getGroups, getGroupJobs, etc: https://api.bullmq.pro/classes/v7.QueuePro.html#getGroups

Regarding the issue with max concurrency, we need to check why active count would show 2 in this particular case. Will update in this issue when we know more.

manast avatar Apr 10 '25 20:04 manast

I believe I've already tried the group-related methods but they were also not yielding any results. I'll test some more, maybe I missed something.

About concurrency - another way to reproduce it, is to set concurrency for a group to 1, have 1 job running for some group and 1 job failed for the same group, and execute retry on that failed job.

It will start running, even though it should respect concurrency.

pjamrozowicz avatar Apr 12 '25 23:04 pjamrozowicz

I tried with the following code and the concurrency was respected, I tried different variants and they worked. I will try another test case with manual retries.

it('should respect max concurrency in the active status', async () => {
    const jobName = 'some-job';
    const numJobs = 100;

    let resolver;
    const processing = new Promise<void>(_resolve => {
      resolver = _resolve;
    });

    const worker = new WorkerPro(
      queueName,
      async (_job: JobPro) => {
        await delay(5);
      },
      {
        connection,
        prefix,
        concurrency: 100,

        group: {
          concurrency: 1,
        },
      },
    );

    worker.on('completed', job => {
      if (job.data.i === 99) {
        resolver();
      }
    });

    const groupId = 'this-is-a-test';

    for (let i = 0; i < numJobs; i++) {
      await queue.add(
        jobName,
        { i },
        { attempts: 100, group: { id: groupId } },
      );
    }

    const activeCount = await queue.getActiveCount();
    expect(activeCount).to.be.equal(1);

    await processing;

    const activeCount2 = await queue.getActiveCount();
    expect(activeCount2).to.be.equal(0);
  });

manast avatar Apr 14 '25 07:04 manast

I created a more involved case with manual retries and it seems like I can reproduce it.

manast avatar Apr 14 '25 08:04 manast

We have a fix that should be released today.

manast avatar Apr 15 '25 08:04 manast

We have a fix that should be released today.

That was quick!

I was worried it's somehow related to the fact that we've just switched to the pro version, from the standard one, but I'm glad it's not that.

I know it's not the right thread, but about these groups - it would be amazing if I could just get all the waiting jobs, including the ones in groups, in one go, without needing to first get all groups and iterate over them to get jobs for each 😉 it's a lot of work and a pain to add pagination for that.

pjamrozowicz avatar Apr 15 '25 10:04 pjamrozowicz