bullmq
bullmq copied to clipboard
[Bug]: Manually processing jobs don't return to waiting state
Version
v3.6.6
Platform
NodeJS
What happened?
If the worker doesn't process the job, the job will be returned to the queue and available after the next getNextJob call.
I wait 90 seconds, but jobs don't return to queue.
How to reproduce.
import { delay, Job, Queue, Worker } from 'bullmq';
const QUEUE_NAME = 'GayratQueManualJob';
// Specify a unique token
const token = 'my-token';
const myQueue = new Queue(QUEUE_NAME, {
connection: {
host: 'localhost',
port: 6379,
},
defaultJobOptions: {
keepLogs: 100,
removeOnComplete: true,
removeOnFail: {
age: 10 * 60, // keep up to 10 минут
},
},
});
await myQueue.drain();
await myQueue.obliterate({ force: true });
const worker = new Worker(QUEUE_NAME, null, {
lockDuration: 1_000,
stalledInterval: 1_000,
maxStalledCount: 0,
connection: {
host: 'localhost',
port: 6379,
},
});
await myQueue.addBulk([
{ name: 'foo', data: 1 },
{ name: 'baz', data: 2 },
]);
let job: Job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);
// await delay(1_100);
job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);
await delay(90_000);
console.log(await myQueue.getActiveCount());
console.log(await myQueue.getCompletedCount());
console.log(await myQueue.getDelayedCount());
console.log(await myQueue.getFailedCount());
// https://docs.bullmq.io/guide/retrying-failing-jobs
// The job has become stalled and it has consumed the "max stalled count" setting.
// the default max stalled check duration is 30 seconds
job = await worker.getNextJob(token);
console.log('Job recieved ', job.name, job.data);
Relevant log output
Job recieved foo 1
Job recieved baz 2
2
0
0
0
TypeError: Cannot read properties of undefined (reading 'name')
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Ok, I see that runStalledJobsCheck()
is not executed in the manual case, we need to fix this.
// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";
/**
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
"manuallyQueue",
{
connection: connection,
concurrency: 5,
delayTime: 3000,
successCallback: async (data) => {
if (data.video == 3) return false;
return true;
},
},
token
);
*/
export function processManualQueue(queueName, options, token) {
const { connection, concurrency, delayTime, successCallback } = options;
const queue = new Queue(queueName, { connection });
const worker = new Worker(queueName, null, {
connection,
concurrency: concurrency || 5,
});
workerRun();
async function workerRun() {
let job;
while (true) {
await delay(delayTime || 3000);
let jobData = null;
let jobId;
let success;
if (job) {
success = await successCallback(job.data);
// console.log("处理手动作业", job.data, "状态", success);
if (success) {
[jobData, jobId] = await job.moveToCompleted(
"some return value",
token
);
} else {
await job.moveToFailed(new Error("some error message"), token);
}
if (jobData) {
job = Job.fromJSON(worker, jobData, jobId);
} else {
job = null;
}
} else {
if (!job) {
job = await worker.getNextJob(token);
}
}
}
}
return { queue, worker };
}
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
"manuallyQueue",
{
connection: connection,
concurrency: 5,
delayTime: 3000,
successCallback: async (data) => {
if (data.video == 3) return false;
return true;
},
},
token
);
// 手动作业, 失败重试
async manuallyQueue() {
// console.log('触发了-test_queue_1');
// manuallyQueue.add("jobName", { video: 1 });
// manuallyQueue.add("jobName", { video: 2 });
for (let i = 1; i < 6; i++) {
manuallyQueue.add(
"jobName",
{ video: i },
{
attempts: 3, // 重试3次
backoff: {
type: "exponential",
delay: 3000, // 1秒后重试
},
}
);
}
},