bull
bull copied to clipboard
Implement a processOnce method
I am wondering if there is a way to do the following in Bull. I have two queues, of which I need information from both queues.
I want to pop a job from Queue A, and then somewhere in processing, pop a job from Queue B, but, only once. I only want a job dequeued from Queue B inside the logic of Queue A's processing, kind of like this in a perfect world.
qA.process(function(item) { qB.processOne(function(item) { } }
If we had a process function that only can be used once I guess your example would work, but it is a bit unorthodox usage of the queue. Can you explain more about the context where you want to do this? maybe there are other ways that matches bull better.
Sure.
Basically, I have a primary queue that has jobs and the qA.process() function works just fine for that. These jobs are added to the queue by another process, that works fine.
During the processing function, I need an element from a second queue. The reason I made this as a queue is because these second elements are time sensitive, meaning that I only want to grab them when I absolutely need them and I use a bull function to selectively remove all non-active keys that have passed the mark of 1 and a half minutes. The element of the second queue is necessary for the function to proceed and I have another process generating elements for this second queue. Unfortunately, I can't simply run both simultaneously and create an event when both are present to proceed with the work, it needs to be as fast as possible
I can do this with simple redis blpop but I wanted the extra security of using bull since it has a lot of built in acking, persistence, etc.
ok. I have never had the need, so I do not know if it makes sense that bull includes a processOnce method that just processes one job. But it is certainly possible to implement, basically its a matter of changing the run function so that instead of calling processJobs, it just calls getNextJob and then processJob. All these methods returns promises, so your processOnce will return a promise that you can use in your main process handler to wait for this second queue to finalize. To make it as stable as the current processyou will also need to take care of stalled jobs, calling movetUnlockedJobsToWait appropriately. I can look more into this, but I have a lot of high priority issues that needs to be implemented before 3.0
That's fine, it's not an urgent request, but it would be great. In the meantime I can use redis's brpop. The second queue that requires the processOnce functionality can be unstable, as in, I don't require stalled jobs to be reaped and placed back in the queue since each item only has a valid lifetime of 2 minutes.
btw, what happen if your main process function fails after processOne has finished? wouldn't you end with a dangling result somewhere?
Yes, that's true, I've added a lot of monitoring/self-healing logic to make this as stable as possible. So far, it appears to work!
Hi
@pacomf and I are trying to implement this processOnce() method. We have a preliminary proposal here supported by some changes in Bull. We would like to discuss the implementation to know if we are missing any use case. We follow some of your recommendations from a previous comment, excepting the use of moveUnlockedJobsToWait and similar methods.
For now, we are testing this new method, and it works: we can get the first job available from the queue and consume it, as usual.
If you want, we can open a new pull request to discuss all the relevant aspects.
Cheers
Haven't we already solved this issue by following this pattern? https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#manually-fetching-jobs
Yes, that is a good solution, but we dont want consume the next job.
I think the code can be simplified to this (untested):
Queue.prototype.processOnce = function() {
var _this = this;
return this.getNextJob().then(function(job) {
if (job) {
_this.processJob().then(function() {
return job.moveToCompleted(null, true, true)
}).then(function() {
return job
});
} else {
return null;
}
});
};
btw, how do you avoid that the queue automatically processes jobs since in your solution you need to define a process function?
In our case, we added the third param (notFetch) to job.moveToCompleted() to automatically retrieve the next job in the queue -> https://github.com/cbjuan/bull/commit/ea2aaf9c3f450a10d3d65aba6ec1d1e3adf02385
We have detected some performance issues when using our processOnce method. These issues related to drained queues and getNextJob()&brpoplpush() methods. To solve that, we have modified getNextJob() to use rpoplpush() if desired (via an optional param). The new implementation is in https://github.com/cbjuan/bull/commit/fbe735f2bc6f65b8f1579d3b5c51fd16bf805112
Also, we reviewed your suggestion of simplifying the processOnce. We are testing it with our changes, but for now, our simplified code is in https://github.com/cbjuan/bull/commit/e5475df73a03f0e338a7eae05e45498ee7f5da3a
All the last suggestions you proposed have been addressed. The current changes in this context are listed in https://github.com/OptimalBits/bull/compare/master...cbjuan:add-processOnce
@cbjuan Did this ever get committed?
@doprdele Our POC was submitted for review https://github.com/OptimalBits/bull/pull/1066. It was closed because the library now implements most of the code to use this kind of processing.
Hi @cbjuan. Thanks! I saw you noted that this behavior was now supported but I don’t see it anywhere in the master branch. Is it termed something else?
hey @doprdele I don't know every detail of how it can be implemented right now; when we closed the PR it could be done like this https://github.com/OptimalBits/bull/pull/1066/files#diff-b68fbb0f945cdf9cddec65ddf0fbe456R648-R673
So the code in the documentation has a lot of errors but it was what I needed to process the job manually, https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#manually-fetching-jobs.
Errors: in the code for the manually processing of the queue. moveToCompleted() does not return a tuple, that code fails because JavaScript doesn't allow destructuring to be written that way. Also, moveToCompleted() either returns an array or undefined so the code needed to be adjusted for that. For the nextJob() function the variables nextJobdata is spelled incorrectly, so the code does not work. for the moveToFailed() the code incorrectly returns a tuple. I checked the source code and I couldn't figure out what moveToFailed() returns. In most scenarios, it does not appear to return anything.
https://repl.it/@Moyarich/bull-manual-setup
const Job = require("bull").Job;
/*
* Process one job from the a queue
*
* @return {Object} The next job sitting in the queue
* @see https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#manually-fetching-jobs
*/
const processOne = (queue, callback) => {
return new Promise(async (resolve, reject) => {
//Pull a job from 'waiting' and moving it to 'active'.
await queue.isReady();
const job = await queue.getNextJob();
if (job) {
//Do something with the Job
const error = callback(job);
/**
* If callback returns an Error , flag the job as failed
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
*/
if (error && error instanceof Error) {
//Move the job to the 'failed' queue if something goes wrong.
job.moveToFailed(
{
message: error.message,
},
true
);
reject(error);
} else {
//Move the job to the 'completed' queue.
const [nextJobData, nextJobId] =
(await job.moveToCompleted("succeeded", true)) || [];
let nextJob;
if (nextJobData) {
nextJob = Job.fromJSON(queue, nextJobData, nextJobId);
}
resolve(nextJob);
}
} else {
resolve();
}
});
};
You can call like this
//Producer
queue.add({ random_attr: "random_value" });
//Worker
const nextJob = await processOne(queue, (job) => {
//Do something with the job here
console.log("this is the job",job);
/*
if something goes wrong, return an Error
return new Error("Whoops!, something went wrong")
*/
});