good_job
good_job copied to clipboard
Feature: dynamic options for good_job_control_concurrency_with
Currently only key
can be set dynamically for good_job_control_concurrency_with
, i.e. perform_limit
and enqueue_limit
cannot be. I have a generic job which is being queued by dozens of cron jobs, each which have different concurrency requirements.
So I wonder if it is/would be possible to do something like:
good_job_control_concurrency_with(
key: -> { "CrawlPostJob_#{arguments.last[:spider_class]}" }, # this currently works
perform_limit: -> { arguments.last[:concurrency] }, # this doesn't work at the moment
)
111
Sorry, I haven't read the contributing section in your README yet, but I dug into this a bit. I manually tested with the dashboard and enqueuing jobs from the console and it seems to allow dynamic settings as I asked about in my initial post.
# lib/good_job/active_job_extensions/concurrency.rb
# frozen_string_literal: true
module GoodJob
module ActiveJobExtensions
module Concurrency
extend ActiveSupport::Concern
class ConcurrencyExceededError < StandardError
def backtrace
[] # suppress backtrace
end
end
module Prepends
def deserialize(job_data)
super
self.good_job_concurrency_key = job_data['good_job_concurrency_key']
self.good_job_concurrency_total_limit = job_data['good_job_concurrency_total_limit']
self.good_job_concurrency_enqueue_limit = job_data['good_job_concurrency_enqueue_limit']
self.good_job_concurrency_perform_limit = job_data['good_job_concurrency_perform_limit']
end
end
included do
prepend Prepends
class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}
attr_writer :good_job_concurrency_key, :good_job_concurrency_total_limit,
:good_job_concurrency_enqueue_limit, :good_job_concurrency_perform_limit
around_enqueue do |job, block|
# Don't attempt to enforce concurrency limits with other queue adapters.
next(block.call) unless job.class.queue_adapter.is_a?(GoodJob::Adapter)
# Always allow jobs to be retried because the current job's execution will complete momentarily
next(block.call) if CurrentThread.active_job_id == job.job_id
enqueue_limit = job.good_job_concurrency_enqueue_limit
total_limit = job.good_job_concurrency_total_limit
has_limit = (enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)) ||
(total_limit.present? && (0...Float::INFINITY).cover?(total_limit))
next(block.call) unless has_limit
# Only generate the concurrency key on the initial enqueue in case it is dynamic
job.good_job_concurrency_key ||= job._good_job_concurrency_key
key = job.good_job_concurrency_key
next(block.call) if key.blank?
GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
enqueue_concurrency = if enqueue_limit
GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Execution.where(concurrency_key: key).unfinished.count
end
# The job has not yet been enqueued, so check if adding it will go over the limit
block.call unless enqueue_concurrency + 1 > (enqueue_limit || total_limit)
end
end
retry_on(
GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
attempts: Float::INFINITY,
wait: :exponentially_longer
)
before_perform do |job|
# Don't attempt to enforce concurrency limits with other queue adapters.
next unless job.class.queue_adapter.is_a?(GoodJob::Adapter)
perform_limit = job.good_job_concurrency_perform_limit ||
job.good_job_concurrency_total_limit
has_limit = perform_limit.present? && (0...Float::INFINITY).cover?(perform_limit)
next unless has_limit
key = job.good_job_concurrency_key
next if key.blank?
if CurrentThread.execution.blank?
logger.debug("Ignoring concurrency limits because the job is executed with `perform_now`.")
next
end
GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
allowed_active_job_ids = GoodJob::Execution.unfinished.where(concurrency_key: key).advisory_locked.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")).limit(perform_limit).pluck(:active_job_id)
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include? job.job_id
end
end
end
class_methods do
def good_job_control_concurrency_with(config)
self.good_job_concurrency_config = config
end
end
# Existing or dynamically generated concurrency key
# @return [Object] concurrency key
def good_job_concurrency_key
@good_job_concurrency_key || _good_job_concurrency_key
end
# Existing or dynamically generated concurrency total_limit
# @return [Object] concurrency total limit
def good_job_concurrency_total_limit
@good_job_concurrency_total_limit || _good_job_concurrency_total_limit
end
# Existing or dynamically generated concurrency enqueue limit
# @return [Object] concurrency enqueue limit
def good_job_concurrency_enqueue_limit
@good_job_concurrency_enqueue_limit || _good_job_concurrency_enqueue_limit
end
# Existing or dynamically generated concurrency perform limit
# @return [Object] concurrency perform limit
def good_job_concurrency_perform_limit
@good_job_concurrency_perform_limit || _good_job_concurrency_perform_limit
end
# Generates the concurrency key from the configuration
# @return [Object] concurrency key
def _good_job_concurrency_key
key = self.class.good_job_concurrency_config[:key]
return if key.blank?
if key.respond_to? :call
instance_exec(&key)
else
key
end
end
# Generates the concurrency total limit from the configuration
# @return [Object] concurrency total limit
def _good_job_concurrency_total_limit
total_limit = self.class.good_job_concurrency_config[:total_limit]
return if total_limit.blank?
if total_limit.respond_to? :call
instance_exec(&total_limit)
else
total_limit
end
end
# Generates the concurrency enqueue limit from the configuration
# @return [Object] concurrency enqueue limit
def _good_job_concurrency_enqueue_limit
enqueue_limit = self.class.good_job_concurrency_config[:enqueue_limit]
return if enqueue_limit.blank?
if enqueue_limit.respond_to? :call
instance_exec(&enqueue_limit)
else
enqueue_limit
end
end
# Generates the concurrency perform limit from the configuration
# @return [Object] concurrency perform limit
def _good_job_concurrency_perform_limit
perform_limit = self.class.good_job_concurrency_config[:perform_limit]
return if perform_limit.blank?
if perform_limit.respond_to? :call
instance_exec(&perform_limit)
else
perform_limit
end
end
end
end
end
@baka-san interesting. Feel free to open a PR because that would make it easier to talk about the code changes.
Could you explain why you would have a job with the same concurrency key but a different concurrency value? How would you expect 2 different jobs to resolve different values of concurrency because the larger concurrency value would always win out.
My thinking is that the key is a property of the job, and thus can be set dynamically. But the concurrency value is a property of the system, and thus it's static.
I could imagine that there would be value in setting concurrency dynamically for the system (e.g. we could only send 2 emails at a time, now we can send 5). In that case, the concurrency value shouldn't be stored/serialized into the job, but be calculated on demand. What do you think of that? (btw, that was how concurrency key worked before #657)
@bensheldon
Could you explain why you would have a job with the same concurrency key but a different concurrency value? How would you expect 2 different jobs to resolve different values of concurrency because the larger concurrency value would always win out.
That's a good point. It would be contradictory to ask for different concurrency values for the same key, so that is definitely something that would need to be considered. I tend to throw errors/warnings if something is coded in a contradictory way.
My thinking is that the key is a property of the job, and thus can be set dynamically. But the concurrency value is a property of the system, and thus it's static.
A key isn't unique though, right? As in, multiple jobs can share the same key. I suppose that what I'm proposing is that perform_limit, enqueue_limit, and total_limit should be dynamic properties of that subset of jobs which shares a given key. So it could be hard coded into the job as something like:
# Any job with the key "SomeKey1" will use the limits defined here
define_concurrency(
key: "SomeKey1",
perform_limit: 3,
enqueue_limit: 10
)
# Any job with the key "SomeKey2" will use the limits defined here
define_concurrency(
key: "SomeKey2",
perform_limit: 1,
enqueue_limit: 10
)
# This continues to work how it does now...only the key is dynamic. Any key that doesn't match the keys above
# will use what is defined here.
good_job_control_concurrency_with(
perform_limit: 5,
enqueue_limit: 10,
key: -> { "dynamically_set_key" }
)
My use case is that I have dozens of websites I am crawling for posts. Since each website has different requirements/limits, for some websites, I may only want a max of 1 bot crawling at a time whereas for other websites I may want a max of 10 bots crawling. So, I want to be able to set a perform_limit based on the key, which is dependent on the website ID, e.g. key: -> { "key_{arguments[:website]}" }
. I originally proposed setting the key dynamically via an argument, but I see your point about different jobs using the same key but mismatching concurrency values. So maybe the solution above might make more sense.
If you think this is too unique of a requirement that might not be useful to other people, then I can handle this in other ways, for example storing the number of running jobs of a given type in the DB and checking that before I enqueue a job. I just thought it would make more sense to control it in one place.
After describing this to you, I wonder if just namespacing and creating individual jobs might not be the best solution. It will be annoying since I have dozens of websites, but it might be the simplest solution for per site concurrency limits.
# Shared logic for all websites
class PostJob < ApplicationJob
# Default setting
good_job_control_concurrency_with(
perform_limit: 5,
enqueue_limit: 10,
key: -> { "Key#{arguments[:website]}" }
)
def perform
...
end
end
# Site specific concurrency can be set
module Post
class Website1Job < PostJob
# Site specific
good_job_control_concurrency_with(
perform_limit: 1,
enqueue_limit: 1,
key: "KeyWebsite1"
)
end
end
@baka-san you've convinced me to simply make that value accept a proc. Then you could simply make a case statement to match the dynamic key for different values. Sharp knives are good if they're clearly sharp 😀
Would you be able to make a PR to allow those *_limit values to accept a proc?
@bensheldon sure thing, I can do that. Would you like me to just include the changes as above or do you want some other implementation?
@baka-san I think those values should be dynamically evaluated, rather than stored within the job... to imply that they're a function of the system, rather than the job e.g. keys are immutable, limits are changeable.
I'm imagining that would look like changing this line:
https://github.com/bensheldon/good_job/blob/4551b97ad3e26270131e2f570dfb3357bab1b790/lib/good_job/active_job_extensions/concurrency.rb#L33-L34
To look something like:
enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit]
enqueue_limit = instance_exec(&_enqueue_limit) if enqueue_limit.respond_to?(:call)
#....
@bensheldon I'll submit a PR when I get some time. Thanks for the discussion.
@bensheldon should this be closed? As I can see proc support was added, anything else left here to work on?