Revisiting `SKIP LOCKED` / advisory locks
TL;DR PoC with SKIP LOCKED at end of thread + some thoughts.
TL;DR 2x This is not an adversarial fork, this really is a deliberation. We like good_job but we are bumping into limitations.
We are approaching the connections limit for our PG instance (which is beefy, very), and seriously considering pgbouncer. However, it does seem that we would need to have separate connections reserved for good_job and they will still be used in connection mode (that is, held from the first usage onwards to the completion of the job). Even if the first connection checkout is an enqueue inside of a job being executed, this would still be problematic because of the following hypothetical scenario:
def perform
#... 4 minutes of work, ideally no "connection-connection" held and we do everything with a "transaction-connection"
SomeOtherJob.perform_later # "connection-connection" gets checked out into the adapter and held
# ... 5 more minutes of work, "connection-connection" still held and not available to others
end
To an extent this can be mitigated by checking out a connection for every enqueue and then checking it back in, but still: we would lose the advantage of having the job enqueue commit transactionally with the other business logic committing - which, so far, allows us to avoid some spots where after_commit would otherwise be necessary. This is a very useful feature which we would miss - same with using a different AJ adapter with a divergent storage, be it Sidekiq or any other. If I may say so, this is why you would want a colocated DB-based queue to begin with - so much extra work just disappears because atomic operations don't need to be as carefully orchestrated.
I've set out at an attempt to do the (fabulously discussed) "imaginary rewrite" prototype of good_job using SKIP LOCKED with the following objectives and constraints (I didn't want to rewrite the entirety of good_job but to test out a hypothesis):
- Remove usage of advisory locks entirely, as they do not multiplex over transactions with pgbouncer (well, they kinda do but it requires opening a transaction and holding it until the advisory lock can be released). pgbouncer = no advisory locks 😭 would have been lovely if they had a full SQL parser to detect at least direct lock usage and could reserve a connection but...
- Use
SKIP LOCKEDas it is... designed for this use case? and seems to be a reasonable replacement for advisory locks here - Transactions are necessary during state transitions (similar to queue_classic) - we lock when we checkout a job for execution, afterwards the job is marked as "in progress" and made unavailable to others via a state enum. This makes the workers at-most-once delivery 😄 I opted for a state enum as other convenient things can be attached to it (in lieu of how good_job uses
finished_atnow) and it is after all an indication of continuous "state at rest" - Not use string operations for the locking stuff but try to use stored and indexed values instead
- See how much functionality we actually are using from the extensions and only implement that with regards to concurrency controls
- Minimise per-job work when enqueueing - anything done to jobs should be done in bulk if possible. We saw good_job do extra work per job when enqueueing in bulk with concurrency keys which is not needed if you allow the DB to toss rows that fail a constraint
- Approach the "job identities" problem a bit more holistically (or, dare I say: in a more dumb fashion)
- No dashboard, cron, batches
- No jruby-favoring features (dequeing inside a properly configured thread should to it anyway, and we are not using jruby)
Identities
For the identities I've decided to take a mixed approach where anything sent to enqueue_all ends up a separate job row, even if it is a retry or a self-reenqueue. Even though this creates more rows in case of retries, it is easy to understand (one enqueue is always one execution, whatever it is called) and easy to build on top of. For example it combines cleanly (without code even) with enqueue concurrency. We also saw the identity question not play ball with job-iteration.
This implies that, for example, the executions_count is not used either as no job could get executed multiple times. Or at least it never should. It can be tracked elsewhere (for the dashboard to display, for example) - it could even be a view.
Concurrency keys
Concurrency keys can be applied very effectively both at enqueue and at checkout. At checkout (for execution concurrency) we can use a subselect, for which a partial index can be built (just the executing jobs across all workers, nothing else). At enqueue we can use the enqueue concurrency key and insert_all which by default will DO NOTHING and toss the jobs which are not getting accepted. The IDs of those can be recovered to set the error later. Note that there is one feature which I was not able to replicate (but we do not use it) and that is a counter of similar jobs executing / sitting in the queue. I figure this can be addressed with a second table with just the counters, and a subselect would need to take that into account - but this is more to start a conversation. Note that concurrency keys in this instance are split because they get used by two different indices.
Cron
I didn't get to implement cron entries yet, my initial plan was to see whether this would work at all.
Stale locks and heartbeats (elephant in the room)
Advisory locks have one amazing property that they get force-released if the client crashes. In the current setup we would get jobs stuck in executing state, so this has a problem similar to Redis locking unless some kind of heartbeat is used to update the job from time to time, and returns the jobs to the enqueued state if they linger without update for too long. This is not very hard but requires a "babysitter" thread with the worker process which needs to be managed. A standard solution would be to try something with concurrent-ruby primitives but I was reluctant to dig that deep just yet.
No inline
I never seen inline executions used with success and the good_job code is full of conditionals where inline processing is a "special case". I'm not the right person to make that call, but I wonder what use cases are there for it except for development/tests? and maybe tests should use the test adapter instead if immediacy is necessary there? (that's what we have been using)
No LISTEN/NOTIFY
Sadly this is something else that needs to go, haven't figured out the alternative yet. What would be the impact of simply polling with an increased interval if the "dequeue 1" operation does not dequeue anything and there is no work to do? There is also a challenge with wakeups. If I understand correctly, at the moment an enqueue will NOTIFY with the queue name which received "a job" (any number of jobs really). All workers which are notified about that queue and are servicing it will proceed to do a SELECT to execute, which is going to be a thundering herd regardless. For instance, in our case there are just 2 queues ("heavy" and "default" with the latter running most of the workers), and any enqueue into "default" would wakeup most workers. One of the options that can be considered is "spreading" the polling between registered worker threads with some jitter, so that they do all wake up at the same time. This will still be a pileup and there will be some delay with the execution start, but the background jobs are "background" for a reason 🤔
What's curious is the perf - SKIP LOCKED is not a panacea, and gives me the following numbers:
Take+perform with 15 workers and 100000 jobs in queue: sample_size: 3912, mean: 0.461, max: 1.812, p90: 0.577, p95: 0.618, p99: 0.877, stddev: 0.122
that with 100K jobs already in the queue and a certain number of jobs getting added continuously in bulks of 500. I suspect this is more due to how PG uses indexes in the query and the query can get optimised further.
This is more to spark a conversation than anything else. The problem I am trying to find a solution for is how to use pgbouncer without losing the commit semantics of a pg-based job queue, which is the main selling point (simplicity aside).
Possible alternatives
One approach that came to mind was splitting the consumer part (the worker) into one "producer" thread, which would be using locks, and the "consumers" which execute jobs. Maybe good_job already works this way, and I just didn't manage to suss it out due to the somewhat complicated web of concurrent-ruby parts in use. But: with a consumer thread and multiple producers, if advisory locks are absolutely necessary (or notifier is absolutely necessary) the producer thread could do the following:
- Dequeue a job, acquire the advisory lock
- Send the job off to one of the performing threads
- Receive any jobs that completed from the performing threads, mark them completed, release the locks for them
Then you would need 1 "connection"-connection per worker process, and enqueueing from the providers could use the "transaction-connections" instead. This adds quite a bit of complexity (which I found ample in good_job already, not debating it - just trying to sketch out) but can allow both LISTEN/NOTIFY and advisory locks on the consuming end. But I wonder: is it that approachable, and is it desirable - given that a lock is held during the entire execution of a job, but a SELECT FOR UPDATE places a row lock just for the state transition of the job? Locking just for the dequeue seems like it could be less aggressive on the DB. My test is very severe in that the actual "performage" is instant - real jobs do take some time to execute though, so while the table will be under stress the "keyspace" for locks would be smaller). With advisory locks we are looking at the minimum of 1 lock per job-in-progress.
The crazy code in question can be found here https://gist.github.com/julik/ad3d1d47735ea04de35d63892c3a89e1 (excuse some bits from our codebase like a custom assert and the safe migrations part, as this is ripped out of our main app). Even though "a rewrite of good_job" sounds very extreme, it does seem doable. Can this be helpful?
❤️ Thank you for looking into this!
This matches up with my expectations of the benefit (database connections), the reality (there's no free lunch for performance), and the elephant in the room (stale locks and heartbeats).
Just on the query itself, I think priority ordering is what kills performance: Postgres either has to use an index ordered on priority and then filter for the scheduled_at/created_at; or it has to us an index ordered on scheduled_at/created_at and then reorder on priority. Either way, I don't believe it's possible to do a simple index query while supporting priority.
(I did make a change in #928 that means that scheduled_at is always populated to remove the OR part of the query).
With GoodJob, I don't think it's worthwhile to build out multiple locking strategies. So I'm working towards a heartbeat (built around the Process record). My thought is to have a lock_by_process_id and locked_at to imply a lock, and then lock_by_process_id will be joined against the Process for dealing with stale/expired.
Your experiment did remind me that Inline mode needs a solution, so that was really helpful 🙌🏻
btw, Cron doesn't do any locking, so you should be golden there.
I think your solution looks good. I can't really go line-by-line, but it seems pretty close to all of the things I'm working through (along with all the deprecation cycles involved).
I'd be happy if you wanted to Zoom pair program on it together and we could try to make the changes in GoodJob itself.
That would be awesome.. could we get in touch via a more real-time medium first? my email should be in bio. It doesn't seem as much about pairing though but about setting the direction first, which we should be aligned on for this to succeed. 🙌
It does seem that a compound index could work here, it has to be configured just right though. TBC.
@julik would you be able to DM me?
- Rails Link Slack (
@bensheldon), or - Rails or Ruby Discords (
@bensheldon)
Just on the query itself, I think priority ordering is what kills performance: Postgres either has to use an index ordered on priority and then filter for the scheduled_at/created_at; or it has to us an index ordered on scheduled_at/created_at and then reorder on priority. Either way, I don't believe it's possible to do a simple index query while supporting priority.
Some other peers don't actually offer priority ordering, and not everyone has this use case -- how about the ability to configure it on or off, if you don't need it, to avoid the performance hit? Is there a way to keep the code maintainable, while allowing opt-out of the performance-significant priority feature?
For many use cases, multiple queues with FIFO are enough, no need for priority within a queue (which after all, makes it no longer exactly a "queue", strictly!)
Some other peers don't actually offer priority ordering, and not everyone has this use case -- how about the ability to configure it on or off, if you don't need it, to avoid the performance hit?
I like that suggestion. It would be fairly simple to offer that as a configuration option, and then slightly change the Active Record query that dequeues jobs if it was set.
I think the most delicate part of providing different job querying patterns is database indexing: GoodJob would need to have migrations that create indexes for both patterns (over-indexing is its own performance problem), or describe that it's not as simple as only setting a configuration option. I'm approaching the "Maybe I should write a book about GoodJob"-stage 😁