bull
                                
                                 bull copied to clipboard
                                
                                    bull copied to clipboard
                            
                            
                            
                        Debounced Jobs
Description
Let's say I want to queue a job every time one of my users updates their profile. But if a user makes many rapid updates I don't want to queue a bunch of jobs that repeat the same work. I'd rather wait until the user is done with the flurry of updates.
This was previously discussed here but it was unrelated to the main thread and there was no resolution.
A simple solution could be to use a custom Id with a delay. When another job needs to be queued, delete the old job and re-queue with delay again. The caveat (touched on in thread above) is when the job is active and being processed and another job needs to be queued. @manast Any thoughts on how to handle this edge case? cc @lxcid
Separately, would be convenient to increase the delay of existing jobs (instead of delete + enqueue).
IssueHunt Summary
Backers (Total: $0.00)
Become a backer now!
Or submit a pull request to get the deposits!
Tips
- Checkout the Issuehunt explorer to discover more funded issues.
- Need some help from other developers? Add your repositories on IssueHunt to raise funds.
Well I guess that should be like any other debouncing, i.e. if the job already started it will continue working, and the job added after that should be either ignored or queued.
Any chance to look at this again?
This would solve a very common use case I have. A possible solution I've been looking at is to have deduplication set up on jobId + state, so that if there's a job already waiting, there will not be new waiting jobs for the same jobId, but if there's a task already active for the same jobId we can still create a task that will be executed after that.
This requires concurrence to be 1, however, and doesn't consider failure/completion states. Also would require some additional checks.
Have also looked at a double queue setup, one for debouncing and the second for the actual processing, but still sounds very hacky.
So a native solution for this would be perfect.
Thoughts? Any direction in which we could help?
Did you guys find any good solution to this problem? I am facing the same situation where the same job can be queued multiple times and I need to process one. Currently, I am storing all the jobs in the Redis myself and batching them up and sending one job in the queue after 10 seconds. But a native solution could be better.
I have the same challenge, in my case with chat message notifications (but when we have a working system it could be applied to many things). The same applies: allow user to trigger a bunch of jobs, only process the last one.
So I think it comes down to the fact that there is a unique context (ie. one chat or one user in @tophep's case) and you want jobs to behave differently based on different queued jobs in that same context.
The opaqueness of redis values + those two dimensions (jobs + contexts) kinda forces you to work with multiple queues or "collections" here I think. Either that or do some whackyness with key prefixes / suffixes?
My current idea involved saving the jobId to debounce:[chatId] and creating a new delayed job (that will check that value to assert whether it should actually do anything). That still feels like it's rife with potential issues though.
https://issuehunt.io/r/OptimalBits/bull/issues/1034
@mauricedoepke has funded $250.00 to this issue.
- Submit pull request via IssueHunt to receive this reward.
- Want to contribute? Chip in to this issue via IssueHunt.
- Checkout the IssueHunt Issue Explorer to see more funded issues.
- Need help from developers? Add your repository on IssueHunt to raise funds.
@manast Would be great if we could get that feature.
@mauricedoepke I am going to look into it, it was long time ago so I barely remember it anymore.
@mauricedoepke just to make clear the requirements for this issue:
- A job added with a debounce parameter will wait X milliseconds before starting to process.
- If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
- If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.
Are these the correct expectations? in other words, a quite standard debounce functionality.
@manast Yes, thats correct. We are meaning exactly the same.
@mauricedoepke just to make clear the requirements for this issue:
- A job added with a debounce parameter will wait X milliseconds before starting to process.
- If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
- If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.
Are these the correct expectations? in other words, a quite standard debounce functionality.
I have a question about the requirements. by new job, you mean a job with the same name and data? or the new job can have different data? if the data is different that the old job we should replace the old one with the new one?
@majidsajadi
I think we should define "new job" by name only. It should be able to have different data and still replace a previous job with the same name.
In my case it will be for debounce sending users notifications This way I can have a queue named "notifications" with a job named by the userId and put all the necessary data for the notifications in the data.
This way I can insert the newest data, but it will still debounce by the userId(job name)
We will need to use the jobId option for this, since that is the proven mechanism we have today to avoid duplicated jobs. In your case it should work since you can use userId as jobId.
In this case we can delay a job with delay option in certain amount of time w and set userId as jobId (overwrite the default id). and on creating new job just check if a job with the provided id exists. we can add an 'overwrite' option for this matter.
or we can add 'debounce' option that works like 'delay' but overwrite any existing job with same jobId.
if im correct i would like to tackle this issue.
Adding a job with an existing ID will do nothing (by default), it will be great to have an option to "overwrite" the job instead. Waiting for this feature. Also, note that overwrites should be atomic, as there might be many publishers
@veedeo this is feasible but only if the overwrite is done before the job has actually started to be processed, it should be as easy as having an extra option "overwrite" and when active instead of ignoring it just adds the job normally (as long as the job is in the wait or delayed sets)
I do intend to work on this. So, let me get this straight:
- Atomic overwriting / upserting of jobs (that have not yet started to process) [would also align with taskforces/bullmq#240]
- Integrate delay functionality for debouncing, where all jobs of same ID queued within a certain delay will overwrite the prior before allowing it to process
- If the job has already started processing or is finished, just add another one to the queue as normal, with the same process as prior
Also, the delay specified by queue.add(name, data, {delay: x}) would override the existing delay.
Ah, so, @jamesholcomb, the plan is to make overwriting jobs extend the delay as well, correct? That makes sense now I think about it.
That would satisfy my use case...For instance, a job is scheduled 3 months out. A user makes a change to some job data that requires it to be moved up to tomorrow.
Well, it only makes sense for atomic overwriting to overwrite all properties, so that works fine. Obviously this is my first contribution to Bull, and so I am unsure of the design philosophy, but it sounds like the logical solution to me.
I'd like to work on this. @mandast you promise to review and pay within a week?
Because my experience with bounties is far from ideal, and I don't have finances to manage.
@tophep @sebasmagri @shivgarg5676 @GriffinSauce
Is anyone of you maybe interested throwing some dollars into the issuehunt as well to make it more appealing for someone to add this feature?
@mauricedoepke has cancelled funding for this issue.(Cancelled amount: $250.00) See it on IssueHunt
Any updates on this super handy feature ? :) Thanks!
hey, everyone, what is current status for this feature? @mauricedoepke could we restore funding?, I am interesting to donate
here is our crutch for debounced jobs with the current bull. maybe will help to somebody
  async addDebounced<TData extends Record<string, unknown>>({
    data,
    debounceKey,
    debounceTimeout,
    jobName,
    options,
    queueName,
  }: {
    data?: TData;
    debounceKey: string;
    debounceTimeout: number;
    jobName: JobName;
    options?: Omit<JobOptions, 'delay' | 'jobId'>;
    queueName?: QueueName;
  }): Promise<Bull.Job<TData>> {
    const queue = queuesByName[queueName || QueueName.AsyncQueue];
    const jobIdPrefix = `Job:${jobName}:${debounceKey}`;
    const jobId = `${jobIdPrefix}:${uuid()}`;
    await queue.removeJobs(`${jobIdPrefix}:*`);
    return queue.add(
      jobName,
      {
        ...data,
        _context: { initialEnvironment: process.env.ENVIRONMENT, requestId },
      },
      {
        ...options,
        delay: debounceTimeout,
        jobId,
      },
    );
  }
@AntonPuko that won't affect active jobs right? Just remove ones waiting?