Manage job dependencies when 1 job is already running
Hello,
At Orderlion we have the following situation:
n REST API requests create n import jobs in our bullmq queue. We receive an API call with e.g. 10 items that need to be imported. This might be the first request.
After that first request, we might receive couple of more requests, that also contain maybe 10, maybe 20, maybe 100 items that need to be imported, while the 20 or 100 might also contain the same 10 items again as the 1st request.
The issue is now this: We have several "import workers", so it might happen, that the 1st request with 10 and the 2nd with 20 items are processed in parallel, leading to duplicate data in the database, because both workers will insert the same 10 items into the DB!
What I need / feature request:
Before adding the 2nd import job into the queue, I would check for "similar import jobs" already in the queue. This is already possible and should return the job of the 1st request, which might be already running.
I now need a way to tell the 2nd queue job to ONLY start AFTER the 1st job has finished!
I know that there is the "Flows" setup, but as far as I could see there is no way to achieve a solution for my problem?! Or is there another way to do something like this?
Thank you all, best, Patrick
We have this concept of "deduplication" that seems like it almost do what you want to do: https://docs.bullmq.io/guide/jobs/deduplication The part that I am not sure how to do is
I now need a way to tell the 2nd queue job to ONLY start AFTER the 1st job has finished!
As I understand it, what you mean is that you should be able to add jobs that are duplicates but just get a guarantee that they should not be processed until the previous job has been processed. In this case maybe flows as you suggest would be a solution, like dynamically adding as child the job that has already been posted before.
Thanks a lot for this quick reply, much appreciated!! ✌
I do think that deduplication could work for us, I see no issue in giving all of these "rest api queue jobs" the same deduplication.id, which should in turn make sure that even if I have e.g. 10 Rest API requests coming in in a few ms, that these 10 resulting queue jobs are only processed in series instead of in parallel (since I have multiple workers) - correct?
Thanks again!
Hello again, So I started working on a proper implementation for my use case.
What is a bit weird is, that in my opinion the duplicated event does not work at all.
I would expect to see this event thrown for any "duplicated job" that is ignored because there is another (active) job with the same deduplication.id - but I do not see the event fired EVER!
In turn, the deduplicated event does work, but the issue is, that this event fires for the "original job", so I do not have any information, which job was actually ignored (basically the 2nd job and every other job after that with the same deduplication.id).
What am I doing wrong here?
In general, I now also tried to console.log the job and could not find anything in the "2nd job" that was ignored that would indicate, that it actually was. I seems to me there is a bug somewhere. YES, the 2nd job is ignored but I have basically no way to find out WHICH job was ignored. I can only find out, which was the original job that "blocked the 2nd job" in the first place (with the deduplicated event) - but the duplicated event does not work/fire for me!
The best is if you post some code that reproduces what you mention and then we can help you out. The feature is well tested, so I would be surprised if there is trivial issues in it.
It is a bit of a tough one, but I will try in pseudo code.
I push a job into the queue, that also contains a deduplication.id:
const job = await myQueue.add(params, { deduplication: { id: 'foobar-123' } });
When this job is being processed, it contains a very simple wait (await new Promise((resolve) => setTimeout(resolve, 5000));), so I have enough time to send the same API request (triggering the above queue add) a 2nd time.
So I call my API endpoint twice in like 1 or 2 seconds.
I then have the 2 events:
// WORKS!
myQueueEvents.on('deduplicated', async ({ deduplicationId, jobId: deduplicatedJobId }) => {
// CAUTION: deduplicatedJobId is the jobId of the "original job", that is currently blocking this new job from being created!
console.log(`[QUEUE] import-queue - deduplication Event:`, { deduplicationId, deduplicatedJobId });
});
// NEVER Fires!
// somehow this does not work at all, but I don't get why! :( see: https://github.com/taskforcesh/bullmq/issues/3168
myQueueEvents.on('duplicated', async ({ jobId }) => {
console.log(`[QUEUE] import-queue - duplicated Event:`, { jobId });
});
I can tell you, that the setup in general works, the 2nd identical job is being ignored correctly!
BUT, I am also expecting to see the duplicated event for the 2nd job to be fire, but I never see it. I only see the deduplicated event being fired, BUT the jobId I get there is NOT the jobId of the job that is being ignored, it is the jobId of the "1st job" that is actually running as it should.
The "duplicated" event is only fired if you add a job to the queue with the same jobId as one existing job. You should not care about this event if you are using the deduplicationId. More info here: https://docs.bullmq.io/guide/jobs/job-ids
Okay, thank you! But my issue remains somehow ... The deduplicate event ONLY gives me the jobId of the "original job" that is actually running and NOT any info about the job that was ignored. But I need this information somehow, otherwise I have no way of knowing that, well, this job was ignored and I need to basically retry it again after e.g. 10 seconds. How would I do that? I have no info anywhere (at least what I could find) that would indicate that job 2 was actually being ignored (as it should).
I would need something like this:
myQueueEvents.on('deduplicated', async ({ deduplicationId, jobId: deduplicatedJobId, ignoredJobId }) => {
// deduplicatedJobId = job 1, the job that is actually running (already)
// ignoredJobId = job 2, the job that was ignored
});
We have now a PR to address this: https://github.com/taskforcesh/bullmq/pull/3177
Thanks so much, much appreciated! ✌
One more thing: Is it possible to also "keep" the ignored job in a separate state, e.g. "ignored" so I can easily retry the same job later? Or would you implement a special logic on my end where I get the event, that job 2 was ignored, and then have to somewhere save the job.data myself and retry the job e.g. in a setTimeout() after x seconds?! Or can you think of a more elegant solution somehow?
hi @Twisterking, I don't think we will add a separated state for ignored jobs in this case, but you can rely on events to save that information