bull icon indicating copy to clipboard operation
bull copied to clipboard

[Feature] Job dependencies

Open ccollie opened this issue 7 years ago • 11 comments

Let's say you have one job that depends on another, but the task definitions are fundamentally different. Consider the following example:

const debitAccount = await orderQueue.add('debitAccount', 
      {id : accountId, orderId: orderId, amount: costOfWidget});

const preparePackingSlip = await orderQueue.add('prepareShipment', 
      {id : accountId, orderId: orderId}, {depends : [debitAccount.id]);

// debitAccount.id not necessary here, but used to demonstrate that 
// we can have multiple dependencies
notificationQueue.add('notifyWidgetShipped', {orderId : orderId}, 
     {depends : [debitAccount.id, preparePackingSlip.id]});

Notifications are sent only after the packing slip job is completed.

We're considering using bull instead of our in-house solution, and this is one of the major missing features. I'd be willing to take the lead on this.

Implementation

Internally this would require a 'dependent' list in addition to 'delayed'. As a jobs are completed, their dependent lists are scanned and the id of the completed job is removed. Once the count of a child job's dependencies reaches 0, it's moved from the 'dependent' list to 'active'. If any of the parent jobs fail, the dependents are discarded recursively.

ccollie avatar May 28 '17 04:05 ccollie

This is a nice proposal, we have already started a discussion about a very similar functionality here: https://github.com/OptimalBits/bull/issues/340

manast avatar May 28 '17 08:05 manast

I think it may be better to decouple the data from the add method to the process method. What I mean is that instead of creating the dependency explicitly per job by setting a depends array, the dependency could be set implicitly when defining the process, for example:

const debitAccount = await orderQueue.add('debitAccount', 
      {id : accountId, orderId: orderId, amount: costOfWidget});

orderQueue.process('prepareShipment', {depends: ['debitAccount']}, function(job){
  // job.data includes the result from the debitAccount processing
});

// debitAccount.id not necessary here, but used to demonstrate that 
// we can have multiple dependencies
notificationQueue.process('notifyWidgetShipped', {depends : ['debitAccount', 'preparePackingSlip']}, function(job){
  // job.data includes the processing results of 'debitAccount' and 'prepareShipment'
});

The main advantage is robustness and probably there is a better separation of concerns. Basically you will kick the process by just adding one job, this is an atomic operation, then the dependent process functions will be called accordingly. There will not exist either dependent jobs that needs to be deleted if some parent fails etc. What do you think?

manast avatar May 28 '17 11:05 manast

#340 closely matches what we do currently at work. With the new proposal, I like the idea that it's a bit higher level (we can think of flow or process instead of individual jobs), and it deals better with cleanup. However this does not deal with one scenario we currently have at work. Consider the following :

// in our case relatedItemIds.length can be over 100
let dependentJobIds = relatedItemIds.map(id => 'job' + id);
 
// handler for "processThatCanFail" places the results in hash in redis keyed on batch id
const batchId = uuid.generate();
const jobs = await Promise.map( relatedItemIds, 
  id => processQueue.add('processThatCanFail', 
                       {id: id, batchId: batchId}, {jobId: 'job' +id});

 reportQueue.add('aggregateAndEmailErrors',  {batchId: batchId}, {depends: dependentJobIds});

ccollie avatar May 28 '17 15:05 ccollie

I see. I will try to think about some solution that covers that use-case. The problem with the approach that you took is that if all the calls to add are not completed you will end with dangling jobs. Somehow you need to express that dependency atomically.

manast avatar May 28 '17 21:05 manast

Just chiming to say that job dependencies is also a need we're facing. We have jobs that depend on the output of previous jobs, so can't be run in parallel. Right now we have a hacky solution where we try to keep track of these dependencies and only add jobs when safe, but this is brittle and there are race conditions when spread across multiple containers. A solution baked into the library would be very helpful.

thatcort avatar Jan 09 '19 21:01 thatcort

I second this need where I need a separate system to keep track of a workflow.

aleccool213 avatar Jan 09 '19 22:01 aleccool213

One caveat is that we don't know in advance what the job dependencies are. Users can add jobs and we can tell if they have dependencies based on the data (if it affects the same entity as another job). So some mechanism for specifying how dependencies are detected or updated would be super helpful.

thatcort avatar Jan 09 '19 22:01 thatcort

How about this workflow:

  • Create a parent job in a paused state (or some other custom non-process status) and obtain the parent job id.
  • Create the child jobs supplying the parent job queue name and id. This would automatically add the dependent job to the parent.
  • On the final child job creation add a flag to activate the parent job or activate it directly.

This workflow would work recursively. No jobs can be processed until all parents are active. This could also be used to control the order of the jobs. Some stale parent or child detection logic would need to exist.

I don't know bull very well. Just trying to help.

grantcarthew avatar Jan 09 '19 23:01 grantcarthew

Our workflow is that users can initiate multiple transformations on a data set. Each transformation needs to operate on the results of the previous one, so they need to run serially. Right now we track the entity (data set) being transformed in the job data and check if one is running on the same entity. If so we wait to add it to the queue. I'm envisioning some sort of callback on jobs that tests whether it is able to run (e.g. Job.canRun(queueState)), but I'm sure other approaches could work too.

@grantcarthew I don't think parent-child relationships would work well here, since we would need to check if a parent job exists for each entity, then create it if not, and then add child jobs. We would end up with a lot of dangling jobs.

thatcort avatar Jan 10 '19 15:01 thatcort

Hi, do you have ETA for this feature?

nemcikjan avatar Oct 24 '21 21:10 nemcikjan

It is implemented in BullMQ: https://docs.bullmq.io/guide/flows Not sure if it will be backported to Bull though.

manast avatar Oct 25 '21 00:10 manast