sidekiq-unique-jobs
sidekiq-unique-jobs copied to clipboard
Feature Request: Unique scheduled jobs
The feature I'm implementing is causing duplicate jobs in Sidekiq::ScheduledSet
, so I would really love to define a set of arguments for my jobs that should be unique across this set, rather than in a queue.
I'd be happy to talk about this at some point. Not sure exactly what you are after, could you elaborate?
I just found this Issue today after searching for a solution: I have jobs per tenant. I tell sidekiq-unique-jobs to make jobs unique by the first argument, which is always the account id. Today we had a situation where 1 tenant needed to be temporarily disabled while they worked through some issues. So essentially, I want to ignore the jobs until the tenant is re-enabled. I don't want to drop them, and I don't want them throwing errors, so in my code I'm saying something like retry_job(wait: 10.minute) if tenant_paused?
.
As a result, my Sidekiq::ScheduledSet
is now full of duplicate jobs whereas if they were added to the regular queue, this gem would make them unique already.
Thanks for reminding me about the scheduled set @glennfu :) I almost forgot about that for v6!
@glennfu @amclaughlin-valetude I had a look into this. The problem only occurs when there isn't a lock for the specific item arguments anymore. This would for instance occur when the lock is created with an expiration. Keep in mind that any keys that are set to expire will do so at the given expiration regardless. Unfortunately older versions of the gem used a default expiration of 30 minutes which could be why you are seeing this error.
The thing is that when you call MyCoolWorker.perform_in(1.seconds, arguments)
the client middleware runs, locks the job and then if there is item['at']
meaning the job is scheduled it will be allowed to be pushed to the schedule
queue. Now even though the job is in another queue it should be locked (meaning other jobs from the same worker class with the same arguments won't be allowed to neither perform_async
nor perform_in
).
What you are experiencing is most likely a problem with configuration, a buggy old version or ActiveJob (which will be unsupported in the next major release).
Is that not implemented yet, or do we have faulty configuration?
Here's screenshot of scheduled jobs:
Jobs in regular queues are locked properly.
class ReportRecalculationWorker
include Sidekiq::Worker
sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject
...
end
It seems a bit random about when it rejects the job...
This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled:
ReportRecalculationWorker.perform_in(10.seconds)
@sharq1 in your situation you already have a unique job in the one that is trying to delay itself.
You would have to use some sidekiq magic or the replay conflict strategy.
@mhenrixon thank you, however I think I wasn't clear enough. I don't want to reschedule / replace the jobs in reports
queue / ScheduledSet.
If ReportRecalculationWorker
is already enqueued / scheduled, I would expect the perform_async
and perform_in
to reject / return nil. But instead it's sometimes allowing me to schedule the worker, as you can see on the screenshots above.
@sharq1 then what about the following:
This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled.
Can you show the code for how you do this? I am fairly certain this will cause problems for you.
Sure:
# frozen_string_literal: true
class ReportRecalculationWorker
include Sidekiq::Worker
sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject
def perform
if too_many_jobs || last_execution_too_recently
return ReportRecalculationWorker.perform_in(10.seconds)
end
save_execution_time
recalcuate...
end
def too_many_jobs
Sidekiq::Queue.new("priority_queue").size > 100
end
def last_execution_too_recently
t = Redis.current.get("ReportRecalculationWorker_last_start")
if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc)
return true
end
false
end
def save_execution_time
Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s)
end
end
Actually it's a bit more complex, as I also added custom runtime lock (not trusting until_and_while_executing lock 😞), so the full implementation is below.
See full implementation
# frozen_string_literal: true
class ReportRecalculationWorker
include Sidekiq::Worker
sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject
def perform
return unless acquire_lock
if too_many_jobs || last_execution_too_recently
return ReportRecalculationWorker.perform_in(10.seconds)
end
save_execution_time
...recalculate...
rescue StandardError => e
...log error...
ensure
release_lock
end
def too_many_jobs
Sidekiq::Queue.new("priority_queue").size > 100
end
def last_execution_too_recently
t = Redis.current.get("ReportRecalculationWorker_last_start")
if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc)
return true
end
false
end
def save_execution_time
Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s)
end
def acquire_lock
# ex: expire in 2h in case lock was not unset because of error
# nx: only set the key if it does not already exist - otherwise return `false`
Redis.current.set("ReportRecalculationWorker_works", "true", ex: 7200, nx: true)
end
def release_lock
Redis.current.del("ReportRecalculationWorker_works")
end
end