Adding "priority ordering" feature to allow users specifying the order of precedence in a queue
#181
No matter the execution_time, given execution_time>CURRENT_TIMESTAMP, the highest priority will always be executed first
e.g.
scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());
@kagkarlsson I wasn't sure how to bump a major version. Please let me know if you have any feedback!
That was fast! I don't know I have yet fully wrapped my head around this feature.
One thing that I still have not thought of a good solution for, is how to index this design effectively. Do you have any thoughts there?
Fetch-due query:
select * from scheduled_tasks where picked = ? and execution_time <= ? order by priority desc, execution_time asc
For high-volume use-cases I have created an index like
create index on scheduled_tasks(execution_time asc)
However, adding priority to the mix, I don't think this index will work (since it now contains two "ranges"):
create index on scheduled_tasks(priority desc, execution_time asc)
I suspect the current solution will be a bit bad for high-throughput use-cases since they will not be able to index effectively. Hypothetically, let's say we have extreme cases where you have 1M executions due... 🤔
I am trying to think of a good solution here... one variant is to allow users to disable priority-sorting for high-volume cases, or going for a very basic priority feature LOW, NORMAL, HIGH and execute three different fetches, for priority HIGH->LOW..
Just to explore some other thoughts.
- Do you require to be able to set priority on
instance-level, or wouldtask-level work? I.e., for a given task, all instances have the same priority - What do you need in terms of cardinality for the priority? Would for example 3, 5 or 10 different values be sufficient? (I am considering if there are any interesting query/index optimizations that can be done if cardinality of
priorityis low)
Do you require to be able to set priority on instance-level, or would task-level work? I.e., for a given task, all instances have the same priority
Both are useful.
Cardinality could even be 10, but it can be really up to the user. Do you think it makes a difference between 10 or 100?
I've seen that you have a pretty nice framework to test changes in https://github.com/kagkarlsson/db-scheduler/pull/175, we might want to use that in case to evaluate
Yeah that could be used to evaluate this. Though I think we could simply create a table, populate with a couple of millions of rows, create the indices and run the selects to see what query-times we are getting..
If cardinality were as low as 3 (high, normal, low), then we might just issue 3 queries
select * from scheduled_tasks where ... execution_time <= ? and priority = HIGH order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = NORMAL order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = LOW order by execution_time asc;
In this case, this index would work well, since priority is locked to a single value for each query:
create index on scheduled_tasks(priority, execution_time asc)
The downside is that we need to issue 3 selects each time we poll, but if we poll for 50+ executions each time, then the overhead will still be pretty small, and all queries will be fast due to "perfect" index
I'm pretty sure that the index on two fields should work pretty well. Queries on two ranges shouldn't be a problem.
Multiple queries are for sure less performant than one query with index and it would also be a strong limitation to users
An index with create index on scheduled_tasks(execution_time asc, priority) would likely be better since the cardinality of execution_time is higher
For your use-case, what volumes are you expecting to use?
Also, I think we should do a local test of performance by just inserting say 5M records with slightly randomized execution-time and priority (max cardinality 10 for now) and see what query-times we are looking at to fetch say 100 executions. And try and optimize them by creating ideal indices
Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.
If you imagine a index execution_time, priority, where priority has cardinality 1, the index performance should match exactly an index that just has execution_time. At that point a difference between cardinality 1 or 10, with a couple million messages should be un-existant.
At the moment I don't have the capacity to create a full fledged test system, would you have time to experiment? Or could we optimize later if needed?
I may get some time to run a couple of tests. I want to be sure that this feature will not make performance worse for those using it for high-throughput cases, or at least that there is an escape-hatch should they experience lower throughput.
Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.
I think for perfect results, the index should match the order by (pre-sorted), but that will not work for us, since we additionally have a where-condition on execution_time < now.
If you put priority as last column in the index, the database will have to read all "due" rows in order to sort by priority. So if 5M rows are due, it has to read them all before returning <limit> rows.
The nice thing about the current (master) select and index is that the database only has to read <limit> rows from the index
Should the results show that this feature may affect performance, then I think we could add it as an opt-in feature, where you explicitly enable it on the scheduler (e.g.enablePriority()). The getDue-method would only add order by priority desc if it is enabled.
Ok, so you would be comparing the current version with the new branch without using priorities (everything 0 by default), correct?
Yeah I suppose so.
master vs this branch where all priorities = null/0
Sorry, I haven't had time to give this PR attention yet. I really need to finish PR #175 and time is limited unfortunately
Hi, I wanted to check if this feature was still something that was going forward? Its something that my team would also find really useful 🙂
I think it is an interesting feature, but there are a couple of things higher up on the list.
Could you describe your use-case?
Sure, the scenario is that I am using the dbscheduler to publish to a few kafka topics. Some of these are just auditing topics and others are actual feature code, so I would like to prioritise feature code over auditing
Are you anticipating high volumes (how high?) such that executions will queue up?
At my company we run https://github.com/instructure/inst-jobs for rails applications which has a similar priority feature. It's pop query is (simplified) "SELECT * FROM jobs WHERE run_at < now() ORDER BY priority, run_at, id", which uses an index on priority, run_at, id and generates a query plan on postgres of
QUERY PLAN
-------------------------------------------------------------------------------------------------------
Index Scan using get_delayed_jobs_index on delayed_jobs
Index Cond: (run_at <= '2023-02-02 01:34:59.958217'::timestamp without time zone)
so at least on postgres such an index is usable without additional filtering. This queue has scaled to millions of jobs in queue and pop-able without performance issues on the pop query, so I'm reasonably confident that such an index works right.
Hi Jacob, thanks for the input! Would you please post the index-definition and the full query-plan? I am very sceptical to postgres being able to use an index on (priority, run_at, id) here
Here's the actual pop query we run in prod. Note that we also have concepts of multiple queues in one table and of stranded jobs, both of which aren't really relevant here:
WITH limited_jobs AS (
SELECT
id,
ROW_NUMBER() OVER () AS row_number
FROM
(
SELECT
"delayed_jobs".*
FROM
"canvas"."delayed_jobs"
WHERE
(
run_at <= '2023-02-02 16:23:24.112691'
AND locked_at IS NULL
AND next_in_strand = TRUE
)
AND "delayed_jobs"."priority" BETWEEN 0
AND 1000000
AND "delayed_jobs"."queue" = 'canvas_queue'
ORDER BY
"delayed_jobs"."priority" ASC,
"delayed_jobs"."run_at" ASC,
"delayed_jobs"."id" ASC
LIMIT
160 FOR
UPDATE
SKIP LOCKED
) subquery
)
UPDATE
"canvas"."delayed_jobs"
SET
locked_by = CASE row_number WHEN 1 THEN '<hostname>:106379' ELSE 'prefetch:<hostname>' END,
locked_at = '2023-02-02 16:23:24.113589'
FROM
limited_jobs
WHERE
limited_jobs.id = "canvas"."delayed_jobs".id RETURNING "canvas"."delayed_jobs".*;
And here's the corresponding full plan (The innermost index scan is the real select from the jobs queue; as evidenced by the rows=160 on the other stuff that's all just operating on subset of jobs plucked by the subquery):
Update on delayed_jobs (cost=78.78..525.03 rows=160 width=570)
CTE limited_jobs
-> WindowAgg (cost=0.42..78.23 rows=160 width=16)
-> Subquery Scan on subquery (cost=0.42..76.23 rows=160 width=8)
-> Limit (cost=0.42..74.63 rows=160 width=2775)
-> LockRows (cost=0.42..79565.10 rows=171541 width=2775)
-> Index Scan using get_delayed_jobs_index on delayed_jobs delayed_jobs_1 (cost=0.42..77849.69 rows=171541 width=2775)
Index Cond: ((priority >= 0) AND (priority <= 1000000) AND (run_at <= '2023-02-02 16:23:24.112691'::timestamp without time zone))
Filter: ((locked_at IS NULL) AND next_in_strand AND ((queue)::text = 'canvas_queue'::text))
-> Nested Loop (cost=0.55..446.80 rows=160 width=570)
-> CTE Scan on limited_jobs (cost=0.00..3.20 rows=160 width=56)
-> Index Scan using delayed_jobs_pkey on delayed_jobs (cost=0.55..2.77 rows=1 width=14)
Index Cond: (id = limited_jobs.id)
The index definition of the get_delayed_jobs_index:
"get_delayed_jobs_index" btree (priority, run_at, id) WHERE queue::text = 'canvas_queue'::text AND locked_at IS NULL AND next_in_strand