bullmq
bullmq copied to clipboard
Is it possible to implement an ack mechanism similar to RabbitMQ?
I need to implement a mechanism similar to RabbitMQ ack:
- When the worker processes a job, it launches another 'process'. This will create an event in an event emitter.
- Once the worker process has finished, the job should not be marked as completed or failed.
- The job is complete (as acknowledged by RabbitMQ) when the event is received.
- If something goes wrong in the middle (e.g. the program stops), then the uncompleted jobs should be queued to be processed again.
Is there a way to do this with bullmq? I'm trying to solve this, but I haven't come up with any ideas.
I've tried this solution, but it's not working because sometimes I get the error job stalled more than allowable limit. The idea was to made available the resolve method of a promise outside of the worker, and call it in another function:
import { Worker } from 'bullmq'
const redisConfig = {
host: 'localhost',
port: 6379,
username: 'default',
password: 'whatever',
}
const QUEUE_NAME = 'exampleQueue'
function currentTime() {
return new Date().toISOString()
}
const jobs = []
function createPromise(){
let theResolve
const promise = new Promise(resolve => { theResolve = resolve})
promise.resolve = theResolve
return promise
}
const _worker = new Worker(QUEUE_NAME, async job => {
try {
console.log(`${currentTime()} - Processing job ${job.id}`)
const promise = createPromise()
const pendingJob = {id: job.id, promise: promise}
console.log('Adding job to pending list: ' + job.id)
jobs.push(pendingJob)
return await promise
} catch (error) {
console.error(error)
}
}, {
connection: redisConfig,
})
setInterval(() => {
console.log('Checking for pending jobs')
const pendingJob = jobs.pop()
if (!pendingJob) return
const { id, promise } = pendingJob
console.log(`Completing the job ${id} outside of the worker `)
promise.resolve(currentTime())
}, 5000)
This is the code to add a job to the queue, in case you need to test:
let jobCounter = 1
async function addJob(queue) {
const jobData = {
counter: jobCounter++,
attempts: 1
}
const job = await queue.add('exampleJob', jobData)
console.log(`Added job ${job.id}`)
}
const queueEvents = new QueueEvents(QUEUE_NAME, {
connection: redisConfig
})
queueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} has been completed with value ${returnvalue}`)
})
queueEvents.on('failed', async (job, _error) => {
console.log(job)
console.log(`*** Job with id ${job.jobId} has failed after all retry attempts.***`);
})
// Single job
addJob(queue)
I've seen this way of manual processing the jobs, I think it could help. I'll try something tomorrow.
Not sure I completely understand the code above, but basically if what you want is to send a result back to the service that added a job to a queue, the way to do it is simply to put the result of the job in a different "result" queue, that the creator of the initial job is listening to with its own worker.
Thank you for your response!
What I'm trying to implement is this:
- Get the job in one part of the code
- Launch a batch process for that job (a docker container, in fact)
In another part of the code:
- listen for an event notifying that the batch process (the container) has finished
- When that event has been received, finish the job
I was doing that with a RabbitMQ lock: the first process obtained the message, but the ack was generated by the second process.
The above code is something similar to the link I posted in the second message but without an infinite loop. I've changed it to use the manual processing you have in the documentation, and I'm trying to implementing the "ack" increasing the lockDuration and maxStalledCount, and probably I'll add another process to renew the lock for the works taking too long. I think it'll do, but it's too complex.
extending the lock should not be very difficult, it will also help you in organiza the worker code so that it does not keep the event loop busy for too long, etc.