bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Bug]: Manually processing jobs don't return to waiting state

Open gayratv opened this issue 2 years ago • 2 comments

Version

v3.6.6

Platform

NodeJS

What happened?

If the worker doesn't process the job, the job will be returned to the queue and available after the next getNextJob call.

I wait 90 seconds, but jobs don't return to queue.

How to reproduce.

import { delay, Job, Queue, Worker } from 'bullmq';

const QUEUE_NAME = 'GayratQueManualJob';
// Specify a unique token
const token = 'my-token';

const myQueue = new Queue(QUEUE_NAME, {
  connection: {
    host: 'localhost',
    port: 6379,
  },
  defaultJobOptions: {
    keepLogs: 100,
    removeOnComplete: true,
    removeOnFail: {
      age: 10 * 60, // keep up to 10 минут
    },
  },
});

await myQueue.drain();
await myQueue.obliterate({ force: true });

const worker = new Worker(QUEUE_NAME, null, {
  lockDuration: 1_000,
  stalledInterval: 1_000,
  maxStalledCount: 0,
  connection: {
    host: 'localhost',
    port: 6379,
  },
});

await myQueue.addBulk([
  { name: 'foo', data: 1 },
  { name: 'baz', data: 2 },
]);

let job: Job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);
// await delay(1_100);

job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);
await delay(90_000);

console.log(await myQueue.getActiveCount());
console.log(await myQueue.getCompletedCount());
console.log(await myQueue.getDelayedCount());
console.log(await myQueue.getFailedCount());

// https://docs.bullmq.io/guide/retrying-failing-jobs
// The job has become stalled and it has consumed the "max stalled count" setting.
// the default max stalled check duration is 30 seconds

job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);

Relevant log output

Job recieved  foo 1
Job recieved  baz 2
2
0
0
0
TypeError: Cannot read properties of undefined (reading 'name')

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

gayratv avatar Feb 17 '23 10:02 gayratv

Ok, I see that runStalledJobsCheck() is not executed in the manual case, we need to fix this.

manast avatar Feb 17 '23 13:02 manast

// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";

/**
    // 调用手动队列处理函数
    const token = nanoid();
    const { queue: manuallyQueue, worker } = processManualQueue(
      "manuallyQueue",
      {
        connection: connection,
        concurrency: 5,
        delayTime: 3000,
        successCallback: async (data) => {
          if (data.video == 3) return false;
          return true;
        },
      },
      token
    ); 
 */
export function processManualQueue(queueName, options, token) {
  const { connection, concurrency, delayTime, successCallback } = options;
  const queue = new Queue(queueName, { connection });
  const worker = new Worker(queueName, null, {
    connection,
    concurrency: concurrency || 5,
  });

  workerRun();

  async function workerRun() {
    let job;

    while (true) {
      await delay(delayTime || 3000);

      let jobData = null;
      let jobId;
      let success;

      if (job) {
        success = await successCallback(job.data);
        // console.log("处理手动作业", job.data, "状态", success);
        if (success) {
          [jobData, jobId] = await job.moveToCompleted(
            "some return value",
            token
          );
        } else {
          await job.moveToFailed(new Error("some error message"), token);
        }

        if (jobData) {
          job = Job.fromJSON(worker, jobData, jobId);
        } else {
          job = null;
        }
      } else {
        if (!job) {
          job = await worker.getNextJob(token);
        }
      }
    }
  }

  return { queue, worker };
}
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
  "manuallyQueue",
  {
    connection: connection,
    concurrency: 5,
    delayTime: 3000,
    successCallback: async (data) => {
      if (data.video == 3) return false;
      return true;
    },
  },
  token
);
// 手动作业, 失败重试
  async manuallyQueue() {
    // console.log('触发了-test_queue_1');
    // manuallyQueue.add("jobName", { video: 1 });
    // manuallyQueue.add("jobName", { video: 2 });

    for (let i = 1; i < 6; i++) {
      manuallyQueue.add(
        "jobName",
        { video: i },
        {
          attempts: 3, // 重试3次
          backoff: {
            type: "exponential",
            delay: 3000, // 1秒后重试
          },
        }
      );
    }
  },

LBC000 avatar Mar 02 '24 08:03 LBC000