good_job icon indicating copy to clipboard operation
good_job copied to clipboard

Feature: dynamic options for good_job_control_concurrency_with

Open baka-san opened this issue 2 years ago • 8 comments

Currently only key can be set dynamically for good_job_control_concurrency_with, i.e. perform_limit and enqueue_limit cannot be. I have a generic job which is being queued by dozens of cron jobs, each which have different concurrency requirements.

So I wonder if it is/would be possible to do something like:

  good_job_control_concurrency_with(
    key: -> { "CrawlPostJob_#{arguments.last[:spider_class]}" }, # this currently works
    perform_limit: -> { arguments.last[:concurrency] }, # this doesn't work at the moment
  )
111

baka-san avatar Jul 29 '22 06:07 baka-san

Sorry, I haven't read the contributing section in your README yet, but I dug into this a bit. I manually tested with the dashboard and enqueuing jobs from the console and it seems to allow dynamic settings as I asked about in my initial post.

# lib/good_job/active_job_extensions/concurrency.rb

# frozen_string_literal: true
module GoodJob
  module ActiveJobExtensions
    module Concurrency
      extend ActiveSupport::Concern

      class ConcurrencyExceededError < StandardError
        def backtrace
          [] # suppress backtrace
        end
      end

      module Prepends
        def deserialize(job_data)
          super
          self.good_job_concurrency_key = job_data['good_job_concurrency_key']
          self.good_job_concurrency_total_limit = job_data['good_job_concurrency_total_limit']
          self.good_job_concurrency_enqueue_limit = job_data['good_job_concurrency_enqueue_limit']
          self.good_job_concurrency_perform_limit = job_data['good_job_concurrency_perform_limit']
        end
      end

      included do
        prepend Prepends

        class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}
        attr_writer :good_job_concurrency_key, :good_job_concurrency_total_limit,
          :good_job_concurrency_enqueue_limit, :good_job_concurrency_perform_limit

        around_enqueue do |job, block|
          # Don't attempt to enforce concurrency limits with other queue adapters.
          next(block.call) unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

          # Always allow jobs to be retried because the current job's execution will complete momentarily
          next(block.call) if CurrentThread.active_job_id == job.job_id

          enqueue_limit = job.good_job_concurrency_enqueue_limit
          total_limit = job.good_job_concurrency_total_limit

          has_limit = (enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)) ||
                      (total_limit.present? && (0...Float::INFINITY).cover?(total_limit))
          next(block.call) unless has_limit

          # Only generate the concurrency key on the initial enqueue in case it is dynamic
          job.good_job_concurrency_key ||= job._good_job_concurrency_key
          key = job.good_job_concurrency_key
          next(block.call) if key.blank?

          GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
            enqueue_concurrency = if enqueue_limit
                                    GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
                                  else
                                    GoodJob::Execution.where(concurrency_key: key).unfinished.count
                                  end

            # The job has not yet been enqueued, so check if adding it will go over the limit
            block.call unless enqueue_concurrency + 1 > (enqueue_limit || total_limit)
          end
        end

        retry_on(
          GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
          attempts: Float::INFINITY,
          wait: :exponentially_longer
        )

        before_perform do |job|
          # Don't attempt to enforce concurrency limits with other queue adapters.
          next unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

          perform_limit = job.good_job_concurrency_perform_limit ||
                          job.good_job_concurrency_total_limit

          has_limit = perform_limit.present? && (0...Float::INFINITY).cover?(perform_limit)
          next unless has_limit

          key = job.good_job_concurrency_key
          next if key.blank?

          if CurrentThread.execution.blank?
            logger.debug("Ignoring concurrency limits because the job is executed with `perform_now`.")
            next
          end

          GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
            allowed_active_job_ids = GoodJob::Execution.unfinished.where(concurrency_key: key).advisory_locked.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")).limit(perform_limit).pluck(:active_job_id)
            # The current job has already been locked and will appear in the previous query
            raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include? job.job_id
          end
        end
      end

      class_methods do
        def good_job_control_concurrency_with(config)
          self.good_job_concurrency_config = config
        end
      end

      # Existing or dynamically generated concurrency key
      # @return [Object] concurrency key
      def good_job_concurrency_key
        @good_job_concurrency_key || _good_job_concurrency_key
      end

      # Existing or dynamically generated concurrency total_limit
      # @return [Object] concurrency total limit
      def good_job_concurrency_total_limit
        @good_job_concurrency_total_limit || _good_job_concurrency_total_limit
      end

      # Existing or dynamically generated concurrency enqueue limit
      # @return [Object] concurrency enqueue limit
      def good_job_concurrency_enqueue_limit
        @good_job_concurrency_enqueue_limit || _good_job_concurrency_enqueue_limit
      end

      # Existing or dynamically generated concurrency perform limit
      # @return [Object] concurrency perform limit
      def good_job_concurrency_perform_limit
        @good_job_concurrency_perform_limit || _good_job_concurrency_perform_limit
      end

      # Generates the concurrency key from the configuration
      # @return [Object] concurrency key
      def _good_job_concurrency_key
        key = self.class.good_job_concurrency_config[:key]
        return if key.blank?

        if key.respond_to? :call
          instance_exec(&key)
        else
          key
        end
      end

      # Generates the concurrency total limit from the configuration
      # @return [Object] concurrency total limit
      def _good_job_concurrency_total_limit
        total_limit = self.class.good_job_concurrency_config[:total_limit]
        return if total_limit.blank?

        if total_limit.respond_to? :call
          instance_exec(&total_limit)
        else
          total_limit
        end
      end

      # Generates the concurrency enqueue limit from the configuration
      # @return [Object] concurrency enqueue limit
      def _good_job_concurrency_enqueue_limit
        enqueue_limit = self.class.good_job_concurrency_config[:enqueue_limit]
        return if enqueue_limit.blank?

        if enqueue_limit.respond_to? :call
          instance_exec(&enqueue_limit)
        else
          enqueue_limit
        end
      end

      # Generates the concurrency perform limit from the configuration
      # @return [Object] concurrency perform limit
      def _good_job_concurrency_perform_limit
        perform_limit = self.class.good_job_concurrency_config[:perform_limit]
        return if perform_limit.blank?

        if perform_limit.respond_to? :call
          instance_exec(&perform_limit)
        else
          perform_limit
        end
      end
    end
  end
end

baka-san avatar Jul 29 '22 07:07 baka-san

@baka-san interesting. Feel free to open a PR because that would make it easier to talk about the code changes.

Could you explain why you would have a job with the same concurrency key but a different concurrency value? How would you expect 2 different jobs to resolve different values of concurrency because the larger concurrency value would always win out.

My thinking is that the key is a property of the job, and thus can be set dynamically. But the concurrency value is a property of the system, and thus it's static.

I could imagine that there would be value in setting concurrency dynamically for the system (e.g. we could only send 2 emails at a time, now we can send 5). In that case, the concurrency value shouldn't be stored/serialized into the job, but be calculated on demand. What do you think of that? (btw, that was how concurrency key worked before #657)

bensheldon avatar Jul 29 '22 15:07 bensheldon

@bensheldon

Could you explain why you would have a job with the same concurrency key but a different concurrency value? How would you expect 2 different jobs to resolve different values of concurrency because the larger concurrency value would always win out.

That's a good point. It would be contradictory to ask for different concurrency values for the same key, so that is definitely something that would need to be considered. I tend to throw errors/warnings if something is coded in a contradictory way.

My thinking is that the key is a property of the job, and thus can be set dynamically. But the concurrency value is a property of the system, and thus it's static.

A key isn't unique though, right? As in, multiple jobs can share the same key. I suppose that what I'm proposing is that perform_limit, enqueue_limit, and total_limit should be dynamic properties of that subset of jobs which shares a given key. So it could be hard coded into the job as something like:

 # Any job with the key "SomeKey1" will use the limits defined here
  define_concurrency(
    key: "SomeKey1",
    perform_limit: 3,
    enqueue_limit: 10
  )

 # Any job with the key "SomeKey2" will use the limits defined here
  define_concurrency(
    key: "SomeKey2",
    perform_limit: 1,
    enqueue_limit: 10
  )

 # This continues to work how it does now...only the key is dynamic. Any key that doesn't match the keys above
 # will use what is defined here.
  good_job_control_concurrency_with(
    perform_limit: 5,
    enqueue_limit: 10,
    key: -> { "dynamically_set_key" }
  )

My use case is that I have dozens of websites I am crawling for posts. Since each website has different requirements/limits, for some websites, I may only want a max of 1 bot crawling at a time whereas for other websites I may want a max of 10 bots crawling. So, I want to be able to set a perform_limit based on the key, which is dependent on the website ID, e.g. key: -> { "key_{arguments[:website]}" }. I originally proposed setting the key dynamically via an argument, but I see your point about different jobs using the same key but mismatching concurrency values. So maybe the solution above might make more sense.

If you think this is too unique of a requirement that might not be useful to other people, then I can handle this in other ways, for example storing the number of running jobs of a given type in the DB and checking that before I enqueue a job. I just thought it would make more sense to control it in one place.

baka-san avatar Aug 01 '22 01:08 baka-san

After describing this to you, I wonder if just namespacing and creating individual jobs might not be the best solution. It will be annoying since I have dozens of websites, but it might be the simplest solution for per site concurrency limits.

# Shared logic for all websites
class PostJob < ApplicationJob
  # Default setting
  good_job_control_concurrency_with(
    perform_limit: 5,
    enqueue_limit: 10,
    key: -> { "Key#{arguments[:website]}" }
  )

  def perform
    ...
  end
end

# Site specific concurrency can be set
module Post
  class Website1Job < PostJob
    # Site specific
    good_job_control_concurrency_with(
      perform_limit: 1,
      enqueue_limit: 1,
      key: "KeyWebsite1"
    )
  end
end

baka-san avatar Aug 01 '22 02:08 baka-san

@baka-san you've convinced me to simply make that value accept a proc. Then you could simply make a case statement to match the dynamic key for different values. Sharp knives are good if they're clearly sharp 😀

Would you be able to make a PR to allow those *_limit values to accept a proc?

bensheldon avatar Aug 01 '22 02:08 bensheldon

@bensheldon sure thing, I can do that. Would you like me to just include the changes as above or do you want some other implementation?

baka-san avatar Aug 01 '22 03:08 baka-san

@baka-san I think those values should be dynamically evaluated, rather than stored within the job... to imply that they're a function of the system, rather than the job e.g. keys are immutable, limits are changeable.

I'm imagining that would look like changing this line:

https://github.com/bensheldon/good_job/blob/4551b97ad3e26270131e2f570dfb3357bab1b790/lib/good_job/active_job_extensions/concurrency.rb#L33-L34

To look something like:

enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit] 
enqueue_limit = instance_exec(&_enqueue_limit) if enqueue_limit.respond_to?(:call)
#....

bensheldon avatar Aug 01 '22 03:08 bensheldon

@bensheldon I'll submit a PR when I get some time. Thanks for the discussion.

baka-san avatar Aug 02 '22 08:08 baka-san

@bensheldon should this be closed? As I can see proc support was added, anything else left here to work on?

dixpac avatar Aug 13 '23 10:08 dixpac