good_job icon indicating copy to clipboard operation
good_job copied to clipboard

Add an ActiveJob extension for throttling

Open bensheldon opened this issue 2 years ago • 11 comments

Suggested on Reddit.

I think this could be done as a before_perform hook in which GoodJob queries the previous time-period for a count, and if the value is met/exceeded, then retries the job with an incremental backoff, similar to how the Concurrency extension does it:

https://github.com/bensheldon/good_job/blob/d365e3ef04f8580da40e65dc7c7e47feeb65e196/lib/good_job/active_job_extensions/concurrency.rb#L44-L46

I think this maybe should be part of the Concurrency extension so that they would re-use the same key. So maybe it would be:

class MyJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_control_concurrency_with(
    # New stuff....
    enqueue_throttle: { limit: 1, period: 1.min },
    perform_throttle: { limit: 100, period: 1.hour },

    # Existing stuff...
    # Maximum number of jobs with the concurrency key to be concurrently enqueued
    enqueue_limit: 2,

    # Maximum number of jobs with the concurrency key to be concurrently performed
    perform_limit: 1,

    # A unique key to be globally locked against.
    # Can be String or Lambda/Proc that is invoked in the context of the job.
    # Note: Arguments passed to #perform_later must be accessed through `arguments` method.
    key: -> { "Unique-#{arguments.first}" } #  MyJob.perform_later("Alice") => "Unique-Alice"
  )

  def perform(first_name)
    # do work
  end
end

bensheldon avatar Jul 29 '21 20:07 bensheldon

why not use activejob-traffic-control?

https://github.com/nickelser/activejob-traffic_control

I guess it adds a redis/memcached dependency

34code avatar Sep 20 '21 03:09 34code

@34code thank for suggesting ActiveJob-traffic_control! I wasn't aware of that gem; and it's good to know there is an existing option until the feature exists in GoodJob.

I'd like to build it into GoodJob because throttling seems like a useful function of concurrency control, as well as it could be powered by Postgres.

bensheldon avatar Oct 01 '21 18:10 bensheldon

Maybe not directly related, but in case you wanted to look into your own throttling algorithm, schneems wrote a nice article about it, although it not being about job scheduling.

Overview: https://schneems.com/2020/06/25/rate-limiting-rate-throttling-and-how-they-work-together/

Experiments and measurements: https://www.schneems.com/2020/07/08/a-fast-car-needs-good-brakes-how-we-added-client-rate-throttling-to-the-platform-api-gem/

aried3r avatar Oct 04 '21 08:10 aried3r

@aried3r thank you for sharing that! It's helpful to have the strategies enumerated.

The strategy I'm thinking of is a moving window, because I think that would be easiest to implement in Postgres.

  • period would be the size of the window
  • limit would be the number of allowed requests in the window

And then the query would be like:

if GoodJob::Execution.where("created_at > ?", period.ago).count < limit
  enqueue_job # or perform it
end

I'm still stuck on how to do a better job of managing perform-concurrency. I'm not happy with the current strategy of raising an error, and catching it with a rescue_from because that enqueues another Execution record. I'm thinking I need to separate a the concept of the time an ActiveJob Job is enqueued, and the time that GoodJob will try to execute/re-execute the job, and then allowing GoodJob to do the incremental backoff specifically for concurrency control (rather than pushing it down entirely into ActiveJob).

bensheldon avatar Oct 04 '21 14:10 bensheldon

We already have a leaky-bucket rate limiter which is Postgres-based, it works quite well. If we were to open-source it, could good_job take it on as a dep?

julik avatar Nov 14 '22 11:11 julik

@julik I have a high bar for pulling in additional dependencies, but I'm curious about your queries/algorithm.

bensheldon avatar Nov 14 '22 18:11 bensheldon

It is described here http://live.julik.nl/2022/08/the-unreasonable-effectiveness-of-leaky-buckets and roughly does this:

def fillup(n_tokens)
  conn = ActiveRecord::Base.connection

  # Take double the time it takes the bucket to empty under normal circumstances
  # until the bucket may be deleted.
  may_be_deleted_after_seconds = (@capacity.to_f / @leak_rate.to_f) * 2.0

  # Create the leaky bucket if it does not exist, and update
  # to the new level, taking the leak rate into account - if the bucket exists.
  query_params = {
    name: @key,
    capa: @capacity.to_f,
    delete_after_s: may_be_deleted_after_seconds,
    leak_rate: @leak_rate.to_f,
    fillup: n_tokens.to_f
  }
  sql = ActiveRecord::Base.sanitize_sql_array([<<~SQL, query_params])
    INSERT INTO leaky_buckets AS t
      (name, atime, expires_at, level)
    VALUES
      (
        :name,
        clock_timestamp(),
        clock_timestamp() + ':delete_after_s second'::interval,
        LEAST(:capa, :fillup)
      )
    ON CONFLICT (key) DO UPDATE SET
      atime = EXCLUDED.atime,
      expires_at = EXCLUDED.may_be_deleted_after,
      level = GREATEST(
          0.0, LEAST(
            :capa,
            t.level + :fillup - (EXTRACT(EPOCH FROM (EXCLUDED.atime - t.atime)) * :leak_rate)
          )
        )
    RETURNING level
  SQL

  # Note the use of .uncached here. The AR query cache will actually see our
  # query as a repeat (since we use "select_value" for the RETURNING bit) and will not call into Postgres
  # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
  # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
  level_after_fillup = conn.uncached { conn.select_value(sql) }

  State.new(level_after_fillup, (@capacity - level_after_fillup).abs < 0.01).tap do
    # Prune buckets which are no longer used. No "uncached" needed here since we are using "execute"
    conn.execute("DELETE FROM leaky_buckets WHERE expires_at < clock_timestamp()")
  end
end

We are not using it in JOINs but I believe it could be done with CTEs if needed

julik avatar Nov 15 '22 12:11 julik

just wanted to add a tiny little bit of input regarding the throttleing:

i often times find myself in a situation where there might be say 10 different apis with 10 different limits for, say, historical data. and often i need to call all of them to download the data. I tend to do this via a "meta job" that loops through all things needed and schedules a "sub job" thats actually doing the data download. This sub job therefor calls the different apis (with the different rate limits).

In sidekiq i ended up using multiple queues but even that did not really fly and i had to use one sidekiq process per queue to fully isolate everything. only then did it really honor the limits imposed.

i could use different jobs for the different limits but this somehow feels wrong and is a slap in the face of the dry principle.

so, it would really be cool if the throttling could be per queue instead of per job, i think(?)

thank you guys for considering this.

andreas-it-dev avatar Feb 03 '23 04:02 andreas-it-dev

I'm not happy with the current strategy of raising an error, and catching it with a rescue_from because that enqueues another Execution record

AFAIK this happens with https://github.com/nickelser/activejob-traffic_control, too. I hope another solution can be found, because otherwise the db could get really polluted, for example in such a scenario:

  • I want to throttle execution to 10 jobs per second (might be an external mail server rate limit)
  • I enqueue 10.000 jobs at once (I want to send out a newsletter to 10.000 recipients)

This would create a massive number of Executions if every try creates a new record.

Maybe you have some ideas how to overcome this (probably first for the concurrency feature)?

Maybe the schedule_at could simply be updated for the existing Execution in such a case without creating a new record?

doits avatar Oct 24 '23 13:10 doits

To cross-link efforts: there is some good code cooking over in https://github.com/bensheldon/good_job/issues/1198

bensheldon avatar Feb 29 '24 15:02 bensheldon

For what it's worth, maybe this concern can be external to ActiveJob or to GoodJob. We have released our rate limiter as https://github.com/cheddar-me/pecorino and it is working very well. You could easily do something like this with Pecorino:

class AmazingJob < ApplicationJob
  around_perform do |job, block|
    t = Pecorino::Throttle.new(key: "#{job.class}_job_throttle", over_time: 1.minute, capacity: 1)
    t.request!
    block.call
  rescue Pecorino::Throttled => e
    job.set(wait: e.retry_after).perform_later
  end

  # ...
end

/cc @Marcovecchio

julik avatar Feb 29 '24 16:02 julik