sidekiq-throttled
sidekiq-throttled copied to clipboard
back-pressure / back-off / circuit breaker
i am relatively new to Sidekiq, and this extension, so please excuse me if there is already a known pattern for this but, I was unable to find one elsewhere. This library has a lot of similar/useful functionality and you seem open to new feature ideas. Feel free to point me somewhere else or debate the merits of this idea.
One of the ways we use Sidekiq is to deliver webhooks but, this idea can apply to other tasks as well. In addition to using the throttle configuration parameters, such as concurrency and threshold, we would also find it useful to be able to 'trip' the throttle/breaker explicitly from inside the perform(...) method, for example after receiving back-pressure from the web-service (e.g. getting an code 429). For the sake of this description, I'm suggesting you would be able to call a method like back_off(...) inside perform(...) (I am not married to that name).
When you receive a 429 you often get a hint about when the web-service will start accepting your requests again (perhaps via a Retry-After header). So, I imagine the back_off(...) method would allow you to optionally pass in a timestamp when to begin processing again - otherwise the default would be to use some sort of exponential back-off or (bonus) a configured Proc akin to how Sidekiq allows users to override the default back-off schedule for job retries.
Just like normal usage of these throttles, I imagine this would remain key-aware, so backing-off would throttle all the jobs with the same key, allowing back-off for specific job parameters versus all jobs of that worker-type. Once the back-pressure elapses, jobs would process as normal given the configuration provided.
Given that resuming a high-concurrency worker-type all-at-once after a back-off period might not be the most graceful thing to do, some circuit-breaker implementations provide some flexibility/configuration regarding resuming. For example, they might ramp up the concurrency of the resumed worker-type (key in this case) starting with just one job to 'test' the breaker. I am not convinced you would need this right away (or at all) and I am worried the added complexity might not have a good cost/benefit ratio. Perhaps its a nice-to-have feature. In the case where you're getting nice hints about when to resume, you probably wouldn't really need this feature. However, in the case where you're getting back-pressure because you're the resource is overloaded, unhealthy, or it is otherwise unavailable (e.g. getting timeouts or 5xxs back) a more gradual resume would be more appropriate.
Anyway, I appreciate the consideration. I'm happy to help implement this if you think its a good idea - you're advice on the approach would be appreciated before I submit a PR.
Here is a simple example:
require 'rest-client'
class WebhookWorker
include Sidekiq::Worker
include Sidekiq::Throttled::Worker
def perform(url, payload)
begin
resp = RestClient::Request.execute(method: :post, url: url, payload: payload.to_json, headers: {content_type: :json, accept: :json})
rescue RestClient::TooManyRequests => err # http 429 response
if err.response && err.response.headers['x-rate-limit-reset']
# https://developer.twitter.com/en/docs/basics/rate-limiting.html
# x-rate-limit-reset: the remaining window before the rate limit resets, in UTC epoch seconds
resume_at = Time.at(err.response.headers['x-rate-limit-reset'].to_i)
back_off(resume_at: resume_at)
else
back_off()
end
end
end
end
I really love the idea. Don't have time capacity to work on it ATM though, so, yes, any help will be highly appreciated. I'll be happy to assist if you need help, but not sure where to start from atm, so not sure what I can advice upfront :D
I guess if you can sketch something out (even not working) - that will give us a solid ground to start on shaping final result. :D
After some consideration I think I would like to see the same for queues as well:
class MyHeaveyLiftingWorker
# ...
def perform(...)
# ...
if ...
pause_worker!(Time.now + 4200)
else
pause_queue!(Time.now + 3600)
end
end
end
I'm not married to the method names though, as well as their signature. I'm totally fine if they will be accepting keyword arguments instead of positional ones. As we're talking about time based pausing, I think it should be done with redis sorted sets (I would propose expirable keys but - as we filter queues prior start polling them - it's better to use sorted sets). To cut down on bandwidth - better use LUA:
-- script: pause.lua
-- KEYS[1] -- sorted set name
-- ARGV[1] -- NOW timestamp
-- ARGV[2] -- RESUME_AT timestamp
-- ARGV[3] -- element
redis.call("ZADD", KEYS[1], ARGV[2], ARGV[3])
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
The usage will look like this:
def pause_something!(resume_at:)
Sidekiq.redis do |redis|
now = Time.at(*redis.time).to_i
resume_at = now + (resume_at - Time.now)
# we are using Redis::Prescription helper to load and eval scripts by SHA:
SCRIPT.eval(redis, :keys => ["paused_something"], :argv = [now, resume_at, queue_name])
end
end
Now there are 2 distinct ways of using those lists:
- grab list of non-expired elements (for queues filtering):
-- KEYS[1] -- sorted set name
-- ARGV[1] -- NOW timestamp
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
return redis.call("ZRANGE", KEYS[1], 0, -1)
- just check if element supposed to be paused:
-- KEYS[1] -- sorted set name
-- ARGV[1] -- NOW timestamp
-- ARGV[2] -- element
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
return if nil == redis.call("ZSCORE", KEYS[1], ARGV[1]) then 0 else 1
We use the GitHub API a lot in our application and have to deal with their rate limiting errors. We wrote a small wrapper around the octokit.rb library that wraps every HTTP exception with an additional retryable_in method that can be used by Sidekiq to retry the job at a specific time. retryable_in is set based on the response headers (X-RateLimit-Reset and Retry-After). In our jobs we can then use the sidekiq_retry_in helper:
sidekiq_retry_in do |_count, exception|
case exception
when GithubClient::ResponseError
exception.retryable_in
end
end
@tjwallace That is nice. Would like to note though, that this will increase retry count and will count toward failures amount. So, I would really like to see some sort of pause job/queue for X time API available.
https://github.com/yammer/circuitbox and https://github.com/Shopify/semian (from Shopify) are interesting approaches.