bull icon indicating copy to clipboard operation
bull copied to clipboard

Is it possible to abort a specific running job ?

Open SimoneTaliercio opened this issue 9 years ago • 30 comments

Hi, first of all thanks for this awesome module!

I have some 'external' conditions which might require to abort a running job. How would you proceed to abort a running job without using Queue##process ?

Ideally, I would need something like Queue##abortJob(jobId) .

Thanks A LOT for your support.

SimoneTaliercio avatar Apr 10 '15 08:04 SimoneTaliercio

This is an interesting use case, I will investigate if it can be fixed somehow.

manast avatar Apr 11 '15 09:04 manast

Thanks Manast! In the meantime I try to get closer to github mechanisms so that one day I could help actively with the code ;)

SimoneTaliercio avatar Apr 11 '15 15:04 SimoneTaliercio

I'm also interested in this feature. I think aborting a job should be handled by the job itself, like checking a job.state or job.aborted property and react to it accordly, so you can shutdown gracefully.

A problem I've found here is how to get access to the job started on another thread on the same queue. Think on a nodejs cluster on the same machine.

rgrocha avatar Jul 27 '16 17:07 rgrocha

@rgrocha I guess one possible way would be to emit an abort event that the job itself can subscribe to. The event must be sent to all worker instances to guarantee that it has been delivered to the one processing the job.

manast avatar Jul 31 '16 20:07 manast

@manast, It works like a charm! I was already using a "broadcast" messaging system for other things and it was just a matter or adding a new message type and add/remove the job listener to the process itself inside the jobQueue.process() function.

Thank you so much!

rgrocha avatar Aug 01 '16 15:08 rgrocha

+1

TomKaltz avatar Sep 08 '16 05:09 TomKaltz

@manast could your global events feature be used for this?

TomKaltz avatar Sep 10 '16 04:09 TomKaltz

@TomKaltz I guess so. But we will also need some way to mark a job as aborted. Either put them in a special "aborted" queue, or failed with a special "reason". The former seems like a more robust approach.

manast avatar Sep 10 '16 08:09 manast

Can "Aborted" be just another status or should it be attributed with failure?

On Saturday, September 10, 2016, Manuel Astudillo [email protected] wrote:

@TomKaltz https://github.com/TomKaltz I guess so. But we will also need some way to mark a job as aborted. Either put them in a special "aborted" queue, or failed with a special "reason". The former seems like a more robust approach.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OptimalBits/bull/issues/114#issuecomment-246098671, or mute the thread https://github.com/notifications/unsubscribe-auth/AC0_IfEkPbAnGfyStran6QgBNKTic5Xyks5qomWngaJpZM4D918q .

Thomas Kaltz III 586-214-7150 Sent from Gmail Mobile

TomKaltz avatar Sep 10 '16 23:09 TomKaltz

Related #363 , method to permanently fail a job

doublerebel avatar Nov 14 '16 18:11 doublerebel

duplicates #363

manast avatar Nov 25 '16 12:11 manast

Sorry I haven't looked at Bull in a while but I'm still looking for out-of-band cancel/abort control of a job. My use case is long-running jobs like video transcoding. I want to be able to cancel them in-flight remotely through my custom UI. Has anyone successfully set something like this up?

TomKaltz avatar Jan 08 '18 20:01 TomKaltz

@manast @doublerebel @TomKaltz I don't think this is a duplicate of #363, this is about aborting a running processing job, #363 is about removing a job after a certain number of attempts.

I have two applications, one API and one processor. The API receives requests from a web interface to add new jobs, then the processor application picks those jobs up from Redis and starts the processing. As far as I can tell, there is no simple way for the API to tell the processor application to stop a long-running processing job? I've looked at hacking it using events, but can't figure it out.

denizdogan avatar Mar 07 '18 09:03 denizdogan

@denizdogan I have give some thought on how to implement the abort/cancel feature for processing jobs. The most reasonable approach I have found is to use the same cancel mechanism as the one provided by bluebird: http://bluebirdjs.com/docs/api/cancellation.html. That means that the processor needs to fullfil some requirements in order to work, and that it will only work with promises and not processors that use the donecallback.

manast avatar Mar 07 '18 10:03 manast

@manast Bluebird's cancellable API may or may not work, I haven't managed to get it to work though and there's very little information around about how to implement that with Bull. Given this piece of code:

queue.process(os.cpus().length, path.join(__dirname, 'process.js'))

...it's not clear to me how I would access the job promise to even be able to call .cancel() on it. I use this version of Queue#process because I want to get around the problem with the event loop.

It's also not clear to me how the API application could even signal "cancel job" to the processing application. I could probably hack it by doing something like job.progress(-1) and catching that update in the processing application, but I'd have to keep my own mapping between job IDs and their cancelable promises, basically none of this seems straight-forward at all.

In my opinion this issue should be re-opened because I can't be alone in thinking this should be implemented or at least better documented.

denizdogan avatar Mar 08 '18 10:03 denizdogan

👍 for this feature, exactly job.cancel()

iamolegga avatar Mar 15 '18 13:03 iamolegga

@manast I think if there is a feature that can emit global custom event could be better. Like this:

queue.pubCustomMsg('kill ' + job.id)

// in the worker
queue.on('custommsg', (msg) => {
   // extract job.id in the message, compare current job.id is the same with it or not, 
   // and then kill process, mark the job failed or completed
})

if there is long run transcoding job, user can kill that process in the message handler.

tinybug avatar Apr 17 '18 03:04 tinybug

I commit a feature that you can publish a message, and then handle abort yourself, tinybug/bull@4bff5e2,

var queue1 = utils.buildQueue();
var queue2 = utils.buildQueue();

var killjob = 'kill:123';

queue1.on('custom:msg', function(msg) {
   // extract the message, and then kill job id 123 process and mark failed or completed here
});

setTimeout(() => {
   queue2.publish(killjob);
}, 100);

tinybug avatar Apr 17 '18 07:04 tinybug

I need to be able to kill a sandboxed process through the API. Is this possible now ? If so, how ?

sebasgarcep avatar Aug 28 '19 17:08 sebasgarcep

My understanding of the situation:

Long running jobs can not be (gracefully) interrupted and discarded just from the outside. Jobs can be flagged and this effects the handling of the job after the job has done its thing. If you want to gracefully interrupt a job the processing code has to check repeatedly if it should continue to run.

Having worked out this precondition here is my usecase and my solution:

I have a queue of length 1 (i.e. I do not allow concurrency). The job holds a long running transaction to a DB that locks out other write operations on the DB. So freeing sparse resources and making sure locks are only held for as long as needed is important. The long running job is triggered by a web agent. The communication to the agent uses a websocket channel (reporting the progress to the client). If the channel is interrupted the process shall be discarded as the result of the process requires interaction with the web agent.

When I receive a (unexpected) disconnect event I cancel the job using Job#discard followed by Job#moveToFailed. These invocations flag the job in a way where I can check within the processing: Job#isActive. If it is not I throw an Exception and this interrupts the long running process.

As I did not want to add lots and lots of Job#isActive calls within my processing code I have added the Job#isActive call to my processing progress callback (that I was already using).

pseudocode:

class JobManager {

disconnectEventHandler(event) {
  job = findJobForConnectin(event.connection)
  job.discard()
  job.moveToFailed()
}

}

class Job {

  run(progressCb) {
    this.progressCb = progressCb;
     while (!this.isDoneCondition) {
         this.heavyLiftingStep();
         this.reportProgress();
     }
  }

  reportProgress() {
     isAlive = this.progressCb();
     if (!isAlive) {
        throw Error('job interrupted');
     }
  }
}

jakobsa avatar Sep 25 '19 16:09 jakobsa

@jakobsa this approach totally work and for me repeating explicit checks inside job code is much better than sending SIGKILL to the child node process 😄

You can always gracefully release resources, for example, commit a transaction and disconnect from database before throwing an exception.

But things are not so simple with sandboxed processing, because inside sandboxed job object you don't have a Redis connection by default. So you need to create it either directly with ioredis or with bull queue.

job.js

'use strict';

const Bull = require('bull');

module.exports = async function(job, done) {
    const queue = new Bull("sb-test1");
    
    try {
        await queue.isReady(); // important! connectedJob.isActive() won't do this check
        const connectedJob = await queue.getJob(job.id);

        while(true) {
            const active = await connectedJob.isActive();
            console.log("Is job " + job.id + " active? " + active);
            if(! active) {
                throw new Error("job cancelled");
            }
            await new Promise((resolve) => { setTimeout(resolve, 1000); });
        }
    } finally {
        queue.close(); // release resources
    }
};

index.ts

import Bull, {Job} from "bull";

const test1 = async () => {
  const queue = new Bull("sb-test1");

  queue.process(__dirname + '/job.js');

  const job = await queue.add({ foo: 'bar' }, {attempts: 10});
  console.log("Queued job " + job.id);

  setTimeout(async () => {
    console.log("Cancelling job " + job.id);
    job.discard(); // ensure no more attempts made
    job.moveToFailed({message: "Job is cancelled by the user request"},
      true); // ignore lock, obviously current connection does not own a job lock
  }, 5000);

  queue.on('complete', (job) => {
    console.log("Job completed! " + job.id);
  });
};

test1();

...output

Queued job 2
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Is job 2 active? true
Cancelling job 2
Is job 2 active? false
(node:14104) UnhandledPromiseRejectionWarning: Error: job cancelled
    at module.exports (/Users/stansv/bulltest/src/job.js:16:23)
    at processTicksAndRejections (internal/process/task_queues.js:85:5)
(node:14104) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:14104) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

stansv avatar Sep 26 '19 09:09 stansv

@stansv You are right. I have not had sandboxed the job (as I first though I had). So thanks for your correction.

jakobsa avatar Sep 27 '19 10:09 jakobsa

I am currently handling it like this in my code:

queue.on('active', async (job) => {

if (condition) {
    await job.discard()
    await job.moveToFailed(new Error(message))
}

})

This is not an amazing solution, and strangely enough will also not trigger the failed event listener. However it at least prevents jobs from entering the process stage.

The documentation about the active event is also inaccurate. It advises to use jobPromise.cancel(). This is clearly wrong, as bull uses ES6, and ES6 do not have cancellation on promises (you would need to use bluebird or something to make that work).

Fohlen avatar Oct 03 '19 16:10 Fohlen

I have the same requirement. Was wondering why that can/should not be handled via progress (as @SimoneTaliercio mentioned that he did not want to use it). Setting it to -1, then listing on queue level for progress changes, and then cancel if it is the same job-id seems to work fine. Am I overlooking something? Are there any problems to be expected?

janober avatar Dec 30 '20 22:12 janober

@janober I tried the same thing, but it appears that if you update a job's process in the queue, the job does not become aware of that change. Anyone found out a convenient way to even just signal the process that it should be terminated?

opalrose510 avatar Apr 23 '21 08:04 opalrose510

After https://docs.bullmq.io/guide/jobs/flows this is the next major feature I want to add to Bull/BullMQ

manast avatar Apr 23 '21 15:04 manast

Interesting @evelyn-f-king it seems to be working fine for us. Here the code we use: https://github.com/n8n-io/n8n/blob/HEAD/packages/cli/commands/worker.ts#L229-L241

janober avatar Apr 23 '21 17:04 janober

This is quite an old Issue, so I'm trying to sort out how it relates to Bull in its current state.

How does this Issue relate to Job.remove()? Could this functionality be implemented with the aforementioned function?

rinogo avatar Feb 22 '22 00:02 rinogo

@rinogo you cannot remove a job that is currently being processed by a worker.

manast avatar Feb 23 '22 08:02 manast

I might be missing something but can't the parent process just kill child process like how it can spawn one?

Or can't we send message from parent to child so the child can do something before getting killed like in the following?

// sandboxed processor
module.exports = function(job) {
  job.on('cancel', () => {
    // Do cleanup or whatever necessary
  });

  //  Code for processing the job
}

(As explained more or less also in https://github.com/OptimalBits/bull/issues/114#issuecomment-236454460 and https://github.com/OptimalBits/bull/issues/114#issuecomment-381819978)

If so, what's the actual hold up? It's an edge-case?

Thanks.

xxRockOnxx avatar Jun 02 '22 22:06 xxRockOnxx