bull
bull copied to clipboard
Is it possible to abort a specific running job ?
Hi, first of all thanks for this awesome module!
I have some 'external' conditions which might require to abort a running job. How would you proceed to abort a running job without using Queue##process ?
Ideally, I would need something like Queue##abortJob(jobId) .
Thanks A LOT for your support.
This is an interesting use case, I will investigate if it can be fixed somehow.
Thanks Manast! In the meantime I try to get closer to github mechanisms so that one day I could help actively with the code ;)
I'm also interested in this feature. I think aborting a job should be handled by the job itself, like checking a job.state or job.aborted property and react to it accordly, so you can shutdown gracefully.
A problem I've found here is how to get access to the job started on another thread on the same queue. Think on a nodejs cluster on the same machine.
@rgrocha I guess one possible way would be to emit an abort event that the job itself can subscribe to. The event must be sent to all worker instances to guarantee that it has been delivered to the one processing the job.
@manast, It works like a charm! I was already using a "broadcast" messaging system for other things and it was just a matter or adding a new message type and add/remove the job listener to the process itself inside the jobQueue.process() function.
Thank you so much!
+1
@manast could your global events feature be used for this?
@TomKaltz I guess so. But we will also need some way to mark a job as aborted. Either put them in a special "aborted" queue, or failed with a special "reason". The former seems like a more robust approach.
Can "Aborted" be just another status or should it be attributed with failure?
On Saturday, September 10, 2016, Manuel Astudillo [email protected] wrote:
@TomKaltz https://github.com/TomKaltz I guess so. But we will also need some way to mark a job as aborted. Either put them in a special "aborted" queue, or failed with a special "reason". The former seems like a more robust approach.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OptimalBits/bull/issues/114#issuecomment-246098671, or mute the thread https://github.com/notifications/unsubscribe-auth/AC0_IfEkPbAnGfyStran6QgBNKTic5Xyks5qomWngaJpZM4D918q .
Thomas Kaltz III 586-214-7150 Sent from Gmail Mobile
Related #363 , method to permanently fail a job
duplicates #363
Sorry I haven't looked at Bull in a while but I'm still looking for out-of-band cancel/abort control of a job. My use case is long-running jobs like video transcoding. I want to be able to cancel them in-flight remotely through my custom UI. Has anyone successfully set something like this up?
@manast @doublerebel @TomKaltz I don't think this is a duplicate of #363, this is about aborting a running processing job, #363 is about removing a job after a certain number of attempts.
I have two applications, one API and one processor. The API receives requests from a web interface to add new jobs, then the processor application picks those jobs up from Redis and starts the processing. As far as I can tell, there is no simple way for the API to tell the processor application to stop a long-running processing job? I've looked at hacking it using events, but can't figure it out.
@denizdogan I have give some thought on how to implement the abort/cancel feature for processing jobs. The most reasonable approach I have found is to use the same cancel mechanism as the one provided by bluebird: http://bluebirdjs.com/docs/api/cancellation.html.
That means that the processor needs to fullfil some requirements in order to work, and that it will only work with promises and not processors that use the done
callback.
@manast Bluebird's cancellable API may or may not work, I haven't managed to get it to work though and there's very little information around about how to implement that with Bull. Given this piece of code:
queue.process(os.cpus().length, path.join(__dirname, 'process.js'))
...it's not clear to me how I would access the job promise to even be able to call .cancel()
on it. I use this version of Queue#process
because I want to get around the problem with the event loop.
It's also not clear to me how the API application could even signal "cancel job" to the processing application. I could probably hack it by doing something like job.progress(-1)
and catching that update in the processing application, but I'd have to keep my own mapping between job IDs and their cancelable promises, basically none of this seems straight-forward at all.
In my opinion this issue should be re-opened because I can't be alone in thinking this should be implemented or at least better documented.
👍 for this feature, exactly job.cancel()
@manast I think if there is a feature that can emit global custom event could be better. Like this:
queue.pubCustomMsg('kill ' + job.id)
// in the worker
queue.on('custommsg', (msg) => {
// extract job.id in the message, compare current job.id is the same with it or not,
// and then kill process, mark the job failed or completed
})
if there is long run transcoding job, user can kill that process in the message handler.
I commit a feature that you can publish a message, and then handle abort yourself, tinybug/bull@4bff5e2,
var queue1 = utils.buildQueue();
var queue2 = utils.buildQueue();
var killjob = 'kill:123';
queue1.on('custom:msg', function(msg) {
// extract the message, and then kill job id 123 process and mark failed or completed here
});
setTimeout(() => {
queue2.publish(killjob);
}, 100);
I need to be able to kill a sandboxed process through the API. Is this possible now ? If so, how ?
My understanding of the situation:
Long running jobs can not be (gracefully) interrupted and discarded just from the outside. Jobs can be flagged and this effects the handling of the job after the job has done its thing. If you want to gracefully interrupt a job the processing code has to check repeatedly if it should continue to run.
Having worked out this precondition here is my usecase and my solution:
I have a queue of length 1 (i.e. I do not allow concurrency). The job holds a long running transaction to a DB that locks out other write operations on the DB. So freeing sparse resources and making sure locks are only held for as long as needed is important. The long running job is triggered by a web agent. The communication to the agent uses a websocket channel (reporting the progress to the client). If the channel is interrupted the process shall be discarded as the result of the process requires interaction with the web agent.
When I receive a (unexpected) disconnect event I cancel the job using Job#discard followed by Job#moveToFailed. These invocations flag the job in a way where I can check within the processing: Job#isActive. If it is not I throw an Exception and this interrupts the long running process.
As I did not want to add lots and lots of Job#isActive calls within my processing code I have added the Job#isActive call to my processing progress callback (that I was already using).
pseudocode:
class JobManager {
disconnectEventHandler(event) {
job = findJobForConnectin(event.connection)
job.discard()
job.moveToFailed()
}
}
class Job {
run(progressCb) {
this.progressCb = progressCb;
while (!this.isDoneCondition) {
this.heavyLiftingStep();
this.reportProgress();
}
}
reportProgress() {
isAlive = this.progressCb();
if (!isAlive) {
throw Error('job interrupted');
}
}
}
@jakobsa this approach totally work and for me repeating explicit checks inside job code is much better than sending SIGKILL to the child node process 😄
You can always gracefully release resources, for example, commit a transaction and disconnect from database before throwing an exception.
But things are not so simple with sandboxed processing, because inside sandboxed job object you don't have a Redis connection by default. So you need to create it either directly with ioredis or with bull queue.
job.js
'use strict';
const Bull = require('bull');
module.exports = async function(job, done) {
const queue = new Bull("sb-test1");
try {
await queue.isReady(); // important! connectedJob.isActive() won't do this check
const connectedJob = await queue.getJob(job.id);
while(true) {
const active = await connectedJob.isActive();
console.log("Is job " + job.id + " active? " + active);
if(! active) {
throw new Error("job cancelled");
}
await new Promise((resolve) => { setTimeout(resolve, 1000); });
}
} finally {
queue.close(); // release resources
}
};
index.ts
import Bull, {Job} from "bull";
const test1 = async () => {
const queue = new Bull("sb-test1");
queue.process(__dirname + '/job.js');
const job = await queue.add({ foo: 'bar' }, {attempts: 10});
console.log("Queued job " + job.id);
setTimeout(async () => {
console.log("Cancelling job " + job.id);
job.discard(); // ensure no more attempts made
job.moveToFailed({message: "Job is cancelled by the user request"},
true); // ignore lock, obviously current connection does not own a job lock
}, 5000);
queue.on('complete', (job) => {
console.log("Job completed! " + job.id);
});
};
test1();
...output
Queued job 2
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Cancelling job 2
Is job 2 active? false
(node:14104) UnhandledPromiseRejectionWarning: Error: job cancelled
at module.exports (/Users/stansv/bulltest/src/job.js:16:23)
at processTicksAndRejections (internal/process/task_queues.js:85:5)
(node:14104) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:14104) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
@stansv You are right. I have not had sandboxed the job (as I first though I had). So thanks for your correction.
I am currently handling it like this in my code:
queue.on('active', async (job) => {
if (condition) {
await job.discard()
await job.moveToFailed(new Error(message))
}
})
This is not an amazing solution, and strangely enough will also not trigger the failed
event listener. However it at least prevents jobs from entering the process stage.
The documentation about the active
event is also inaccurate. It advises to use jobPromise.cancel()
. This is clearly wrong, as bull
uses ES6
, and ES6
do not have cancellation on promises (you would need to use bluebird or something to make that work).
I have the same requirement. Was wondering why that can/should not be handled via progress
(as @SimoneTaliercio mentioned that he did not want to use it). Setting it to -1
, then listing on queue level for progress
changes, and then cancel if it is the same job-id seems to work fine.
Am I overlooking something? Are there any problems to be expected?
@janober I tried the same thing, but it appears that if you update a job's process in the queue, the job does not become aware of that change. Anyone found out a convenient way to even just signal the process that it should be terminated?
After https://docs.bullmq.io/guide/jobs/flows this is the next major feature I want to add to Bull/BullMQ
Interesting @evelyn-f-king it seems to be working fine for us. Here the code we use: https://github.com/n8n-io/n8n/blob/HEAD/packages/cli/commands/worker.ts#L229-L241
This is quite an old Issue, so I'm trying to sort out how it relates to Bull in its current state.
How does this Issue relate to Job.remove()
? Could this functionality be implemented with the aforementioned function?
@rinogo you cannot remove a job that is currently being processed by a worker.
I might be missing something but can't the parent process just kill child process like how it can spawn one?
Or can't we send message from parent to child so the child can do something before getting killed like in the following?
// sandboxed processor
module.exports = function(job) {
job.on('cancel', () => {
// Do cleanup or whatever necessary
});
// Code for processing the job
}
(As explained more or less also in https://github.com/OptimalBits/bull/issues/114#issuecomment-236454460 and https://github.com/OptimalBits/bull/issues/114#issuecomment-381819978)
If so, what's the actual hold up? It's an edge-case?
Thanks.