sidekiq-unique-jobs icon indicating copy to clipboard operation
sidekiq-unique-jobs copied to clipboard

Feature Request: Unique scheduled jobs

Open amclaughlin-valetude opened this issue 7 years ago • 9 comments

The feature I'm implementing is causing duplicate jobs in Sidekiq::ScheduledSet, so I would really love to define a set of arguments for my jobs that should be unique across this set, rather than in a queue.

amclaughlin-valetude avatar Oct 13 '17 18:10 amclaughlin-valetude

I'd be happy to talk about this at some point. Not sure exactly what you are after, could you elaborate?

mhenrixon avatar Jun 26 '18 20:06 mhenrixon

I just found this Issue today after searching for a solution: I have jobs per tenant. I tell sidekiq-unique-jobs to make jobs unique by the first argument, which is always the account id. Today we had a situation where 1 tenant needed to be temporarily disabled while they worked through some issues. So essentially, I want to ignore the jobs until the tenant is re-enabled. I don't want to drop them, and I don't want them throwing errors, so in my code I'm saying something like retry_job(wait: 10.minute) if tenant_paused?.

As a result, my Sidekiq::ScheduledSet is now full of duplicate jobs whereas if they were added to the regular queue, this gem would make them unique already.

glennfu avatar Jun 29 '18 23:06 glennfu

Thanks for reminding me about the scheduled set @glennfu :) I almost forgot about that for v6!

mhenrixon avatar Jun 30 '18 05:06 mhenrixon

@glennfu @amclaughlin-valetude I had a look into this. The problem only occurs when there isn't a lock for the specific item arguments anymore. This would for instance occur when the lock is created with an expiration. Keep in mind that any keys that are set to expire will do so at the given expiration regardless. Unfortunately older versions of the gem used a default expiration of 30 minutes which could be why you are seeing this error.

The thing is that when you call MyCoolWorker.perform_in(1.seconds, arguments) the client middleware runs, locks the job and then if there is item['at'] meaning the job is scheduled it will be allowed to be pushed to the schedule queue. Now even though the job is in another queue it should be locked (meaning other jobs from the same worker class with the same arguments won't be allowed to neither perform_async nor perform_in).

What you are experiencing is most likely a problem with configuration, a buggy old version or ActiveJob (which will be unsupported in the next major release).

mhenrixon avatar Jun 30 '18 09:06 mhenrixon

Is that not implemented yet, or do we have faulty configuration? Here's screenshot of scheduled jobs: image

Jobs in regular queues are locked properly.

class ReportRecalculationWorker
  include Sidekiq::Worker
  sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject

  ...
end

It seems a bit random about when it rejects the job...

image

This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled:

ReportRecalculationWorker.perform_in(10.seconds)

sharq1 avatar Sep 25 '21 09:09 sharq1

@sharq1 in your situation you already have a unique job in the one that is trying to delay itself.

You would have to use some sidekiq magic or the replay conflict strategy.

mhenrixon avatar Sep 25 '21 15:09 mhenrixon

@mhenrixon thank you, however I think I wasn't clear enough. I don't want to reschedule / replace the jobs in reports queue / ScheduledSet.

If ReportRecalculationWorker is already enqueued / scheduled, I would expect the perform_async and perform_in to reject / return nil. But instead it's sometimes allowing me to schedule the worker, as you can see on the screenshots above.

sharq1 avatar Sep 27 '21 06:09 sharq1

@sharq1 then what about the following:

This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled.

Can you show the code for how you do this? I am fairly certain this will cause problems for you.

mhenrixon avatar Sep 27 '21 09:09 mhenrixon

Sure:

# frozen_string_literal: true

class ReportRecalculationWorker
  include Sidekiq::Worker
  sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject

  def perform
    if too_many_jobs || last_execution_too_recently
      return ReportRecalculationWorker.perform_in(10.seconds)
    end

    save_execution_time
    recalcuate...
  end

  def too_many_jobs
    Sidekiq::Queue.new("priority_queue").size > 100
  end

  def last_execution_too_recently
    t = Redis.current.get("ReportRecalculationWorker_last_start")
    if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc)
      return true
    end

    false
  end

  def save_execution_time
    Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s)
  end
end

Actually it's a bit more complex, as I also added custom runtime lock (not trusting until_and_while_executing lock 😞), so the full implementation is below.

See full implementation
# frozen_string_literal: true

class ReportRecalculationWorker
  include Sidekiq::Worker
  sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject

  def perform
    return unless acquire_lock

    if too_many_jobs || last_execution_too_recently
      return ReportRecalculationWorker.perform_in(10.seconds)
    end

    save_execution_time
    ...recalculate...
  rescue StandardError => e
    ...log error...
  ensure
    release_lock
  end

  def too_many_jobs
    Sidekiq::Queue.new("priority_queue").size > 100
  end

  def last_execution_too_recently
    t = Redis.current.get("ReportRecalculationWorker_last_start")
    if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc)
      return true
    end

    false
  end

  def save_execution_time
    Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s)
  end

  def acquire_lock
    # ex: expire in 2h in case lock was not unset because of error
    # nx: only set the key if it does not already exist - otherwise return `false`
    Redis.current.set("ReportRecalculationWorker_works", "true", ex: 7200, nx: true)
  end

  def release_lock
    Redis.current.del("ReportRecalculationWorker_works")
  end
end

sharq1 avatar Sep 27 '21 09:09 sharq1