bull icon indicating copy to clipboard operation
bull copied to clipboard

Debounced Jobs

Open tophep opened this issue 7 years ago • 29 comments

Issuehunt badges

Description

Let's say I want to queue a job every time one of my users updates their profile. But if a user makes many rapid updates I don't want to queue a bunch of jobs that repeat the same work. I'd rather wait until the user is done with the flurry of updates.

This was previously discussed here but it was unrelated to the main thread and there was no resolution.

A simple solution could be to use a custom Id with a delay. When another job needs to be queued, delete the old job and re-queue with delay again. The caveat (touched on in thread above) is when the job is active and being processed and another job needs to be queued. @manast Any thoughts on how to handle this edge case? cc @lxcid

Separately, would be convenient to increase the delay of existing jobs (instead of delete + enqueue).


IssueHunt Summary

Backers (Total: $0.00)

Become a backer now!

Or submit a pull request to get the deposits!

Tips

tophep avatar Aug 27 '18 17:08 tophep

Well I guess that should be like any other debouncing, i.e. if the job already started it will continue working, and the job added after that should be either ignored or queued.

manast avatar Aug 27 '18 19:08 manast

Any chance to look at this again?

This would solve a very common use case I have. A possible solution I've been looking at is to have deduplication set up on jobId + state, so that if there's a job already waiting, there will not be new waiting jobs for the same jobId, but if there's a task already active for the same jobId we can still create a task that will be executed after that.

This requires concurrence to be 1, however, and doesn't consider failure/completion states. Also would require some additional checks.

Have also looked at a double queue setup, one for debouncing and the second for the actual processing, but still sounds very hacky.

So a native solution for this would be perfect.

Thoughts? Any direction in which we could help?

sebasmagri avatar Oct 23 '19 12:10 sebasmagri

Did you guys find any good solution to this problem? I am facing the same situation where the same job can be queued multiple times and I need to process one. Currently, I am storing all the jobs in the Redis myself and batching them up and sending one job in the queue after 10 seconds. But a native solution could be better.

shivgarg5676 avatar Dec 06 '19 05:12 shivgarg5676

I have the same challenge, in my case with chat message notifications (but when we have a working system it could be applied to many things). The same applies: allow user to trigger a bunch of jobs, only process the last one.

So I think it comes down to the fact that there is a unique context (ie. one chat or one user in @tophep's case) and you want jobs to behave differently based on different queued jobs in that same context.

The opaqueness of redis values + those two dimensions (jobs + contexts) kinda forces you to work with multiple queues or "collections" here I think. Either that or do some whackyness with key prefixes / suffixes?

My current idea involved saving the jobId to debounce:[chatId] and creating a new delayed job (that will check that value to assert whether it should actually do anything). That still feels like it's rife with potential issues though.

GriffinSauce avatar Dec 23 '19 14:12 GriffinSauce

https://issuehunt.io/r/OptimalBits/bull/issues/1034

manast avatar Dec 25 '19 10:12 manast

@mauricedoepke has funded $250.00 to this issue.


issuehunt-oss[bot] avatar Nov 25 '20 09:11 issuehunt-oss[bot]

@manast Would be great if we could get that feature.

mauricedoepke avatar Nov 25 '20 10:11 mauricedoepke

@mauricedoepke I am going to look into it, it was long time ago so I barely remember it anymore.

manast avatar Nov 25 '20 11:11 manast

@mauricedoepke just to make clear the requirements for this issue:

  1. A job added with a debounce parameter will wait X milliseconds before starting to process.
  2. If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
  3. If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.

Are these the correct expectations? in other words, a quite standard debounce functionality.

manast avatar Nov 26 '20 09:11 manast

@manast Yes, thats correct. We are meaning exactly the same.

mauricedoepke avatar Nov 27 '20 14:11 mauricedoepke

@mauricedoepke just to make clear the requirements for this issue:

  1. A job added with a debounce parameter will wait X milliseconds before starting to process.
  2. If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
  3. If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.

Are these the correct expectations? in other words, a quite standard debounce functionality.

I have a question about the requirements. by new job, you mean a job with the same name and data? or the new job can have different data? if the data is different that the old job we should replace the old one with the new one?

majidsajadi avatar Dec 04 '20 18:12 majidsajadi

@majidsajadi

I think we should define "new job" by name only. It should be able to have different data and still replace a previous job with the same name.

In my case it will be for debounce sending users notifications This way I can have a queue named "notifications" with a job named by the userId and put all the necessary data for the notifications in the data.

This way I can insert the newest data, but it will still debounce by the userId(job name)

mauricedoepke avatar Dec 07 '20 20:12 mauricedoepke

We will need to use the jobId option for this, since that is the proven mechanism we have today to avoid duplicated jobs. In your case it should work since you can use userId as jobId.

manast avatar Dec 08 '20 07:12 manast

In this case we can delay a job with delay option in certain amount of time w and set userId as jobId (overwrite the default id). and on creating new job just check if a job with the provided id exists. we can add an 'overwrite' option for this matter.

or we can add 'debounce' option that works like 'delay' but overwrite any existing job with same jobId.

if im correct i would like to tackle this issue.

majidsajadi avatar Dec 10 '20 17:12 majidsajadi

Adding a job with an existing ID will do nothing (by default), it will be great to have an option to "overwrite" the job instead. Waiting for this feature. Also, note that overwrites should be atomic, as there might be many publishers

veedeo avatar Dec 31 '20 13:12 veedeo

@veedeo this is feasible but only if the overwrite is done before the job has actually started to be processed, it should be as easy as having an extra option "overwrite" and when active instead of ignoring it just adds the job normally (as long as the job is in the wait or delayed sets)

manast avatar Jan 02 '21 11:01 manast

I do intend to work on this. So, let me get this straight:

  1. Atomic overwriting / upserting of jobs (that have not yet started to process) [would also align with taskforces/bullmq#240]
  2. Integrate delay functionality for debouncing, where all jobs of same ID queued within a certain delay will overwrite the prior before allowing it to process
  3. If the job has already started processing or is finished, just add another one to the queue as normal, with the same process as prior

Nytelife26 avatar Jan 15 '21 21:01 Nytelife26

Also, the delay specified by queue.add(name, data, {delay: x}) would override the existing delay.

jamesholcomb avatar Jan 15 '21 21:01 jamesholcomb

Ah, so, @jamesholcomb, the plan is to make overwriting jobs extend the delay as well, correct? That makes sense now I think about it.

Nytelife26 avatar Jan 15 '21 21:01 Nytelife26

That would satisfy my use case...For instance, a job is scheduled 3 months out. A user makes a change to some job data that requires it to be moved up to tomorrow.

jamesholcomb avatar Jan 15 '21 22:01 jamesholcomb

Well, it only makes sense for atomic overwriting to overwrite all properties, so that works fine. Obviously this is my first contribution to Bull, and so I am unsure of the design philosophy, but it sounds like the logical solution to me.

Nytelife26 avatar Jan 15 '21 22:01 Nytelife26

I'd like to work on this. @mandast you promise to review and pay within a week?

janat08 avatar Jan 27 '21 06:01 janat08

Because my experience with bounties is far from ideal, and I don't have finances to manage.

janat08 avatar Jan 30 '21 16:01 janat08

@tophep @sebasmagri @shivgarg5676 @GriffinSauce

Is anyone of you maybe interested throwing some dollars into the issuehunt as well to make it more appealing for someone to add this feature?

mauricedoepke avatar Feb 25 '21 21:02 mauricedoepke

@mauricedoepke has cancelled funding for this issue.(Cancelled amount: $250.00) See it on IssueHunt

issuehunt-oss[bot] avatar Mar 01 '21 21:03 issuehunt-oss[bot]

Any updates on this super handy feature ? :) Thanks!

sevetseh28 avatar May 10 '21 19:05 sevetseh28

hey, everyone, what is current status for this feature? @mauricedoepke could we restore funding?, I am interesting to donate

KirillSuhodolov avatar May 05 '22 14:05 KirillSuhodolov

here is our crutch for debounced jobs with the current bull. maybe will help to somebody

  async addDebounced<TData extends Record<string, unknown>>({
    data,
    debounceKey,
    debounceTimeout,
    jobName,
    options,
    queueName,
  }: {
    data?: TData;
    debounceKey: string;
    debounceTimeout: number;
    jobName: JobName;
    options?: Omit<JobOptions, 'delay' | 'jobId'>;
    queueName?: QueueName;
  }): Promise<Bull.Job<TData>> {
    const queue = queuesByName[queueName || QueueName.AsyncQueue];
    const jobIdPrefix = `Job:${jobName}:${debounceKey}`;
    const jobId = `${jobIdPrefix}:${uuid()}`;
    await queue.removeJobs(`${jobIdPrefix}:*`);

    return queue.add(
      jobName,
      {
        ...data,
        _context: { initialEnvironment: process.env.ENVIRONMENT, requestId },
      },
      {
        ...options,
        delay: debounceTimeout,
        jobId,
      },
    );
  }

AntonPuko avatar May 05 '22 21:05 AntonPuko

@AntonPuko that won't affect active jobs right? Just remove ones waiting?

jmbeach avatar Jul 01 '22 18:07 jmbeach