bull icon indicating copy to clipboard operation
bull copied to clipboard

[PROPOSAL] Chained jobs

Open manast opened this issue 8 years ago β€’ 51 comments

I have been working in designing a new feature for Bull that I feel would be really useful for model more complex systems, where there are many specialized queues working together to generate some results.

I would like to present my design here so that you can help me design it and together make a solution that works really well, and at the same time not bloating the current APIs unnecessarily.

manast avatar Aug 29 '16 08:08 manast

The basic idea is that we introduce two new properties in the job data, children and parent. The former will keep track of all the jobs that needs to be completed before the current job can be decided to be completed or failed. The parentis used so that a job knows that he needs to update the children property of its parent when completed.

Here some pseudocode:

queueA.process(function(job){
  return doSomeAsyncProcessing.then(function(){
    return job.addChildren([
      {
        job: data1,
        queue: queueB
      },
      {
        job: data2,
        queue: queueC
      },
    ]);
  });
});

We introduce a new method, addChildren, where an array of sub-jobs can be added. Note that the sub-jobs can run in other queues (that will be the most common case). Also that the jobs will be executed in parallel. If in series execution is required, then addChildren can be called in the sub-jobs.

//
// New properties in the data structure:
//
{
  parent: jobId // Parent jobId for this job.
}

{
  children: {
    ids: [jobId1, jobId2, ...],
    completed: []
  }
}

Here comes some pseudo-code for the new functionality. Note that this methods needs to be integrated in the current scripts that moves a job to complete/failed so that we can keep atomicity.

// A job that is completed or failed:

if( job.hasChildren() ){
  waitUntilAllChildrenCompleted();
}else if( job.hasParent() ){
    notifyParentForCompletion();
  }
} else {
  moveToCompleted/Failed();
}

function waitUntilAllChildrenCompleted(){
  // This is in fact no function, the wait is done implicitly.
  // see notifyParentForCompletion
}

function notifyParentForCompletion(job){
  job.addJobIdToParent();

  if(checkIfAllChildrenCompleted()){
    job.parent.moveToCompleted/Failed();
  }
}

manast avatar Aug 29 '16 08:08 manast

Nice proposal! It would also be useful to register children jobs depending on the state of the parent job, for example, if the job succeeded, then I want jobB and jobC to run, and if it fails, I want jobD to run. Also, it'd be nice to add to the interface an option to whether the parent should wait for children to finish or not, maybe some of the jobs are more like "fire-and-forget"

xdc0 avatar Sep 01 '16 22:09 xdc0

I can add the failed suggestion, fits nicely in the design. Regarding the "fire-and-forget" functionality, I think you can get that functionality already today by just adding jobs in your process function.

manast avatar Sep 02 '16 11:09 manast

Note to self: Although internally this will be implemented as parent-child relationship, I think the API should expose this functionality as a process that is divided in steps.

manast avatar Mar 22 '17 08:03 manast

Hi @manast.

I've been lurking on your project for a while now since I built rethinkdb-job-queue. I'm also watching Kue and bee-queue. I've picked up some ideas from you and the others.

I have had a couple of people ask about complex job processing and documented my suggestions. I didn't think about adding parent/child type of jobs. It's an interesting idea although it is adding more complexity to the project.

Have a read here for my suggestions: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Complex-Job

I'm just joining in with this discussion because it is a common issue and I'm interested in how you are approaching it.

grantcarthew avatar Mar 23 '17 00:03 grantcarthew

This would be really useful for my use case!

Say I have a list of tasks that must be processed sequentially (not concurrently).

They therefore must define their dependencies upon job creation atomically, before they even get a chance to run:

const dependency = queue.add('processMessage', { data });
const dependant = queue.add('processMessage', { data }, { dependencies: [dependency] });

Internally, this could then add a dependant field on the all the tasks in dependencies with the new task ID, and then once each dependency finishes, you can notify the system to start running the dependant.

What do you think? :)

bensalilijames avatar Sep 05 '17 16:09 bensalilijames

I'm anxious for this as well, but more from a parent/child relationship perspective. I have a job that creates child jobs, and I'd like the parent to complete when the children have completed. The big difference for me that I'm not determining the dependents until the parent job is already running. However, if needed I could change the design.

ritter avatar Sep 05 '17 17:09 ritter

I think the two ideas should be able to co-exist in the same solution: upon all dependencies/child tasks finishing, if the dependant/parent is already running, it marks it as complete, otherwise it starts it. Best of both worlds?

bensalilijames avatar Sep 05 '17 17:09 bensalilijames

there are several ways to consider this functionality. For example, the dependencies or parent/child relationships could be not per job but per job type, i.e., you define the process functions for the child and the process functions for the parent, and then the dependencies are handled implicitly, you do not need to create them when you add the jobs. Not sure I am being clear here :)

manast avatar Sep 05 '17 18:09 manast

Sounds good to have an implicit system - out of interest, how would it marry up IDs of related jobs? For example:

queue.process('typeA', ...);
queue.process('typeB', { depends: ['typeA'] }, ...);

const job1 = queue.add('typeA', { data });
const job2 = queue.add('typeB', { data });

In this case, I'm struggling to understand how job2 could wait for job1 - it doesn't know they should be related!

(btw, I only point this out because it's important in my particular use case that jobs are created all at once and not spawned from a parent job) :)

bensalilijames avatar Sep 05 '17 19:09 bensalilijames

well I guess that in the process callback function we could also have an extra argument, dependencies:

queue.process('typeB', { depends: ['typeA'] }, function(job, dependencies){
...
});

would'nt that solve your case?

manast avatar Sep 05 '17 19:09 manast

of course there is the issue with multiple dependencies. If they are unrelated to each other, its one thing, but if they are related to each other and they need to be grouped together, then we need to be able to express that somehow as well.

manast avatar Sep 05 '17 19:09 manast

I can't see how the dependencies argument would solve this case, even in the case of single dependencies, but I might be going crazy! πŸ˜„ Could you elaborate a little?

bensalilijames avatar Sep 05 '17 19:09 bensalilijames

I am just brainstorming, I have not placed all the pieces of the puzzle yet. But dependencies should be an array with all the jobs that the the process function depends on. In the example above, dependencies will have only one job, of type A.

manast avatar Sep 05 '17 19:09 manast

Got it. So what about if I do this:

queue.process('typeA', ...);
queue.process('typeB', { depends: ['typeA'] }, ...);

const job1 = queue.add('typeA', { data });
const job2 = queue.add('typeA', { data });
const job3 = queue.add('typeB', { data });
const job4 = queue.add('typeB', { data });

I need job3 to depend on job1, and job4 to depend on job2. But dependencies would include all the typeA jobs?

bensalilijames avatar Sep 05 '17 19:09 bensalilijames

yes, that was what I meant before when I said "related to each other". The problem with the approach where a given job depends on another given job is that it does not scale so well in a distributed system. Also it imposes challenges on how to implement this atomically, for instance, you add a job of typeA but the process dies before you are able to create the job of typeB that depends on it, you end with a dangling job.

manast avatar Sep 05 '17 20:09 manast

In the design I previously mentioned, I don't think it matters if the process dies before the job of typeB is created, since the dependency would only be declared on the typeB job. Until the typeB job is added (atomically?), there's no dependency there so results in no dangling jobs?

I understand the scaling issue - I imagine we will have very short chains with really only a single dependency in each job so I'm hoping it doesn't get too bad.

I suppose more than anything I'm just sounding out what I think a good way of resolving these dependencies would be, with a view to getting a PR going!

bensalilijames avatar Sep 05 '17 20:09 bensalilijames

with dangling job I mean the jobA which was completed as part of a job composed by several jobs. So to speak only one subjob has been completed, leaving the whole job never able to complete. Would you mind to write an example where the dependency between typeA and typeB must be created explicitly (job3 depend on job1 in your example above)

manast avatar Sep 05 '17 21:09 manast

Sure thing:

const job1 = queue.add('processMessage', { data });
const job2 = queue.add('processMessage', { data });
const job3 = queue.add('sendMessage', { data }, { dependencies: [job1] });
const job4 = queue.add('sendMessage', { data }, { dependencies: [job2] });
// could extend... :)

bensalilijames avatar Sep 05 '17 21:09 bensalilijames

@benhjames I was mostly interested if you could show in the data part for example, why a given job needs to be dependent of another job. Btw, what do you think about the proposed API in the first post? I think it does what you want and also can be implemented atomically:

queueA.process(function(job){
  return doSomeAsyncProcessing.then(function(){
    return job.addChildren([
      {
        job: data1,
        queue: queueB
      },
      {
        job: data2,
        queue: queueC
      },
    ]);
  });
});

manast avatar Sep 06 '17 08:09 manast

Oops, sorry, it's because job1 might modify some external state, which job3 relies upon e.g. writing something to a database, or sending something over a websocket.

That API could work, though you'd need to only create a single job up front, which has as its data the data for the rest of the jobs, e.g.:

megaJobData = {
  jobs: [job1Data, job2Data, job3Data, ...]
}
queue.process('megaJob', async (job) => {
  // extract first job in list and run it
  await doSomeWork(job.jobs[0]);
  // remove first job from list
  job.jobs.unshift();
  // recursively adds jobs one after another so they maintain dependencies
  job.addChildren([
    {
      data: job.jobs,
      queue: 'megaJob',
    }
  ]);
});

I think this is more complicated than it needs to be! πŸ™‚

bensalilijames avatar Sep 06 '17 10:09 bensalilijames

Hey, chiming in, I am considering Bull for my company needs, where parent/child relationship would be extremely beneficial, especially if the parent can receive events ('completed', 'failed' [...]) from a children, and events when all children are finished ('all:ended'). That, because some child-jobs' failure might not be critical enough to fail the parent-job. We can actually imagine a job who's task is to repeat those failed, non-critical jobs over days or weeks, maybe because those jobs tried to reach not yet available business resources.

I would suggest to have dependency per-job (instead of, or alongside to per-jobType), with a job.children which would mimic Queue API, and childrens sending back events. You could either create children directly from a parent job, or declare a jobId as parent. I offer an API example below.

My usecase consist of various jobs pipelined together. Imagine a "Provider" job fetching files and then spawning several "Process" jobs (each managing a "type" of file). Each of these Process jobs would then spawn "File" jobs, and finally, each of these File jobs would spawn "Unit" jobs. Now, also imagine that a Process job can also depend on another Process job, and File job can also depend on another File job (also called ETL hell). Below will be a rough draft and also a suggestion of API, based on this thread, events and on jobId

/* Ignoring possible jobId conflicts in the future */
const process1 = { queue: 'processQueue', name: 'Process1', data: {}, opts: {jobId: 'Process1-jobId'} };
const process2 = { queue: 'processQueue', name: 'Process2', data: {}, opts: {jobId: 'Process2-jobId', depends: [process1.opts.jobId]} };
providerQueue.add('Provider1', { processes: [process2, process1] }, { jobId: 'Provider1-jobId', repeat: repeatOpts }, job => async {
    await doStuff() // Would actually fill processes data

    /* Here, job.children mirror parts of the queue API --- children.add however receive an additional 'queue' option */
    job.data.processes.forEach(childJobOpts => job.children.add(childJobOpts));
    /* We can also track children's events : either individually, targeted, or all */ 
    job.children.on('completed', (childJob, result) => log('a children has completed'));
    job.children.getJob(`${process2.name}-${timestamp}`).on('failed', (childJob, err) => log('children process 2 failed'));
    /* childrenJobs === [{ job: childJob, result: childPromise }, ...] where childPromise is either resolved or rejected depending on completed or failed */
    job.children.on('all:ended', childrenJobs => log('all children have ended, that is either completed or failed.'));
});

The above example display a couple noteworthy cases : The first is to receive an event from a job. It's out of the scope of this issue, but would do for a nice addition. The second case is that process2 is job.children.add before process1. That is, when a job A (= process2) is declared, and depends on a jobId not yet created, then the job A will wait until the mentioned jobId is created. If the job of id jobId is never created, then the job A will never get outside of the wait state (or, there could be a waitStateTTL option somewhere in the job creation). Finally, when jobId is created, job A can begin :

processQueue.process('Process2', job => {
    /* job.parents contains Provider1JobObject and Process1JobObject, thanks to the opts.depends and the children.add */
    /* You can here again aim for individually, targeted or all parents */
    job.parents.getJob('Process1-jobId').on('completed', (parentJob, result) => async {
        await doStuff(result) // Wait until Process1 resolution to start doing stuff
        log('I have succeeded !');
    });
    job.parents.getJob('Process1-jobId').on('failed', (parentJob, err) => log('The only parent I care about has failed, aborting'));
    /* For other usecases, we can also imagine */
    job.parents.on('all:completed', parentsJobs => log('All of my parents succeeded, I can begin'));
    job.parents.on('failed', (parentJob, err) => log('One of my parent has failed, aborting'));
});

As seen, the children and parents knows quite a bit from each other; and a huge part of the responsibility is on the jobs themselves, through events. For ease of use, I would also suggest the addition of an additional argument on events, jobType.

(queue/children/parents).on(jobType, event, return => { });
// queue.on('Process1', 'completed', (jobs, rslt) => { });
// parents.on('Process2', 'all:ended', jobsRslt => { });

As a note, you could switch the focus from Events to Promises on the API, and encourage the job to use Promise.all(), but we sadly are losing in flexibility.

I am aware there are alternative possibilities for my use case (avoiding breaking down things in tasks so much for example), and I plan on doing so for the first poc, but that definitely lessen the quality of the solution. I would definitely like to see this subject getting some attention, and I might be able to put in some time for a PR (not immediately however) depending on the discussion

EnigmaDoor avatar Nov 30 '17 17:11 EnigmaDoor

@EnigmaDoor thanks for you input. I have read your post carefully and I need to think more about it. However, one thing worth noting is that a solution based on standard events will never be completely fail proof, events may or may not arrive due to network issues, etc. So the solution needs to be based either on message queues or/and some atomic work done at LUA level in redis.

manast avatar Nov 30 '17 20:11 manast

@manast what’s the progress here? Very eager to see the solution that comes from this great idea.

TomKaltz avatar Jan 15 '18 05:01 TomKaltz

Any updates on this?

mxs42 avatar Jul 20 '18 10:07 mxs42

some update for this?

fernandops26 avatar Sep 30 '18 03:09 fernandops26

This feature is going to be awesome but I need some more time before it is ready...

manast avatar Sep 30 '18 18:09 manast

@manast Would you accept a PR or have you made enough progress yourself to where that would be pointless? Several of us would love to help get it implemented πŸ‘¨β€πŸ’» πŸ‘©β€πŸ’»

gcox avatar Oct 16 '18 11:10 gcox

@gcox this feature requires quite a lot of work in the inner workings of bull so I think it is difficult to develop unless understanding how everything is designed... I am always open for PRs but realistically I think I will be the one developing it :).

manast avatar Oct 17 '18 20:10 manast

You should check celery https://docs.celeryproject.org/en/latest/userguide/canvas.html

It enables many of these workflows

One workflow that I had on celery was like this:

  • a parent job start N children jobs to process PDF in parallel
  • when all children finishes a job join all the generated PDF pages and create a final PDF

will this possible when this issue is finished?

sibelius avatar Apr 22 '19 02:04 sibelius