db-scheduler icon indicating copy to clipboard operation
db-scheduler copied to clipboard

Adding "priority ordering" feature to allow users specifying the order of precedence in a queue

Open FreCap opened this issue 4 years ago • 29 comments

#181

No matter the execution_time, given execution_time>CURRENT_TIMESTAMP, the highest priority will always be executed first

e.g.

scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());

@kagkarlsson I wasn't sure how to bump a major version. Please let me know if you have any feedback!

FreCap avatar Feb 20 '21 23:02 FreCap

That was fast! I don't know I have yet fully wrapped my head around this feature.

One thing that I still have not thought of a good solution for, is how to index this design effectively. Do you have any thoughts there?

Fetch-due query:

select * from scheduled_tasks where picked = ? and execution_time <= ? order by priority desc, execution_time asc

For high-volume use-cases I have created an index like

create index on scheduled_tasks(execution_time asc)

However, adding priority to the mix, I don't think this index will work (since it now contains two "ranges"):

create index on scheduled_tasks(priority desc, execution_time asc)

I suspect the current solution will be a bit bad for high-throughput use-cases since they will not be able to index effectively. Hypothetically, let's say we have extreme cases where you have 1M executions due... 🤔

I am trying to think of a good solution here... one variant is to allow users to disable priority-sorting for high-volume cases, or going for a very basic priority feature LOW, NORMAL, HIGH and execute three different fetches, for priority HIGH->LOW..

kagkarlsson avatar Feb 21 '21 18:02 kagkarlsson

Just to explore some other thoughts.

  • Do you require to be able to set priority on instance-level, or would task-level work? I.e., for a given task, all instances have the same priority
  • What do you need in terms of cardinality for the priority? Would for example 3, 5 or 10 different values be sufficient? (I am considering if there are any interesting query/index optimizations that can be done if cardinality of priority is low)

kagkarlsson avatar Feb 22 '21 09:02 kagkarlsson

Do you require to be able to set priority on instance-level, or would task-level work? I.e., for a given task, all instances have the same priority Both are useful. Cardinality could even be 10, but it can be really up to the user. Do you think it makes a difference between 10 or 100?

I've seen that you have a pretty nice framework to test changes in https://github.com/kagkarlsson/db-scheduler/pull/175, we might want to use that in case to evaluate

FreCap avatar Feb 22 '21 13:02 FreCap

Yeah that could be used to evaluate this. Though I think we could simply create a table, populate with a couple of millions of rows, create the indices and run the selects to see what query-times we are getting..

kagkarlsson avatar Feb 22 '21 17:02 kagkarlsson

If cardinality were as low as 3 (high, normal, low), then we might just issue 3 queries

select * from scheduled_tasks where ... execution_time <= ? and priority = HIGH order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = NORMAL order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = LOW order by execution_time asc;

In this case, this index would work well, since priority is locked to a single value for each query:

create index on scheduled_tasks(priority, execution_time asc)

The downside is that we need to issue 3 selects each time we poll, but if we poll for 50+ executions each time, then the overhead will still be pretty small, and all queries will be fast due to "perfect" index

kagkarlsson avatar Feb 22 '21 17:02 kagkarlsson

I'm pretty sure that the index on two fields should work pretty well. Queries on two ranges shouldn't be a problem.

Multiple queries are for sure less performant than one query with index and it would also be a strong limitation to users

An index with create index on scheduled_tasks(execution_time asc, priority) would likely be better since the cardinality of execution_time is higher

FreCap avatar Feb 22 '21 19:02 FreCap

For your use-case, what volumes are you expecting to use?

Also, I think we should do a local test of performance by just inserting say 5M records with slightly randomized execution-time and priority (max cardinality 10 for now) and see what query-times we are looking at to fetch say 100 executions. And try and optimize them by creating ideal indices

kagkarlsson avatar Feb 23 '21 07:02 kagkarlsson

Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.

If you imagine a index execution_time, priority, where priority has cardinality 1, the index performance should match exactly an index that just has execution_time. At that point a difference between cardinality 1 or 10, with a couple million messages should be un-existant.

At the moment I don't have the capacity to create a full fledged test system, would you have time to experiment? Or could we optimize later if needed?

FreCap avatar Feb 23 '21 14:02 FreCap

I may get some time to run a couple of tests. I want to be sure that this feature will not make performance worse for those using it for high-throughput cases, or at least that there is an escape-hatch should they experience lower throughput.

Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.

I think for perfect results, the index should match the order by (pre-sorted), but that will not work for us, since we additionally have a where-condition on execution_time < now. If you put priority as last column in the index, the database will have to read all "due" rows in order to sort by priority. So if 5M rows are due, it has to read them all before returning <limit> rows.

The nice thing about the current (master) select and index is that the database only has to read <limit> rows from the index

kagkarlsson avatar Feb 23 '21 15:02 kagkarlsson

Should the results show that this feature may affect performance, then I think we could add it as an opt-in feature, where you explicitly enable it on the scheduler (e.g.enablePriority()). The getDue-method would only add order by priority desc if it is enabled.

kagkarlsson avatar Feb 24 '21 08:02 kagkarlsson

Ok, so you would be comparing the current version with the new branch without using priorities (everything 0 by default), correct?

FreCap avatar Feb 24 '21 11:02 FreCap

Yeah I suppose so.

master vs this branch where all priorities = null/0

kagkarlsson avatar Feb 25 '21 09:02 kagkarlsson

Sorry, I haven't had time to give this PR attention yet. I really need to finish PR #175 and time is limited unfortunately

kagkarlsson avatar Mar 05 '21 19:03 kagkarlsson

Hi, I wanted to check if this feature was still something that was going forward? Its something that my team would also find really useful 🙂

tonyhkly avatar Jan 24 '23 10:01 tonyhkly

I think it is an interesting feature, but there are a couple of things higher up on the list.

Could you describe your use-case?

kagkarlsson avatar Jan 25 '23 15:01 kagkarlsson

Sure, the scenario is that I am using the dbscheduler to publish to a few kafka topics. Some of these are just auditing topics and others are actual feature code, so I would like to prioritise feature code over auditing

tonyhkly avatar Jan 27 '23 20:01 tonyhkly

Are you anticipating high volumes (how high?) such that executions will queue up?

kagkarlsson avatar Jan 30 '23 11:01 kagkarlsson

At my company we run https://github.com/instructure/inst-jobs for rails applications which has a similar priority feature. It's pop query is (simplified) "SELECT * FROM jobs WHERE run_at < now() ORDER BY priority, run_at, id", which uses an index on priority, run_at, id and generates a query plan on postgres of

                                              QUERY PLAN                                               
-------------------------------------------------------------------------------------------------------
 Index Scan using get_delayed_jobs_index on delayed_jobs
   Index Cond: (run_at <= '2023-02-02 01:34:59.958217'::timestamp without time zone)

so at least on postgres such an index is usable without additional filtering. This queue has scaled to millions of jobs in queue and pop-able without performance issues on the pop query, so I'm reasonably confident that such an index works right.

maths22 avatar Feb 02 '23 01:02 maths22

Hi Jacob, thanks for the input! Would you please post the index-definition and the full query-plan? I am very sceptical to postgres being able to use an index on (priority, run_at, id) here

kagkarlsson avatar Feb 02 '23 07:02 kagkarlsson

Here's the actual pop query we run in prod. Note that we also have concepts of multiple queues in one table and of stranded jobs, both of which aren't really relevant here:

WITH limited_jobs AS (
  SELECT
    id,
    ROW_NUMBER() OVER () AS row_number
  FROM
    (
      SELECT
        "delayed_jobs".*
      FROM
        "canvas"."delayed_jobs"
      WHERE
        (
          run_at <= '2023-02-02 16:23:24.112691'
          AND locked_at IS NULL
          AND next_in_strand = TRUE
        )
        AND "delayed_jobs"."priority" BETWEEN 0
        AND 1000000
        AND "delayed_jobs"."queue" = 'canvas_queue'
      ORDER BY
        "delayed_jobs"."priority" ASC,
        "delayed_jobs"."run_at" ASC,
        "delayed_jobs"."id" ASC
      LIMIT
        160 FOR
      UPDATE
        SKIP LOCKED
    ) subquery
)
UPDATE
  "canvas"."delayed_jobs"
SET
  locked_by = CASE row_number WHEN 1 THEN '<hostname>:106379' ELSE 'prefetch:<hostname>' END,
  locked_at = '2023-02-02 16:23:24.113589'
FROM
  limited_jobs
WHERE
  limited_jobs.id = "canvas"."delayed_jobs".id RETURNING "canvas"."delayed_jobs".*;

And here's the corresponding full plan (The innermost index scan is the real select from the jobs queue; as evidenced by the rows=160 on the other stuff that's all just operating on subset of jobs plucked by the subquery):

Update on delayed_jobs  (cost=78.78..525.03 rows=160 width=570)
   CTE limited_jobs
     ->  WindowAgg  (cost=0.42..78.23 rows=160 width=16)
           ->  Subquery Scan on subquery  (cost=0.42..76.23 rows=160 width=8)
                 ->  Limit  (cost=0.42..74.63 rows=160 width=2775)
                       ->  LockRows  (cost=0.42..79565.10 rows=171541 width=2775)
                             ->  Index Scan using get_delayed_jobs_index on delayed_jobs delayed_jobs_1  (cost=0.42..77849.69 rows=171541 width=2775)
                                   Index Cond: ((priority >= 0) AND (priority <= 1000000) AND (run_at <= '2023-02-02 16:23:24.112691'::timestamp without time zone))
                                   Filter: ((locked_at IS NULL) AND next_in_strand AND ((queue)::text = 'canvas_queue'::text))
   ->  Nested Loop  (cost=0.55..446.80 rows=160 width=570)
         ->  CTE Scan on limited_jobs  (cost=0.00..3.20 rows=160 width=56)
         ->  Index Scan using delayed_jobs_pkey on delayed_jobs  (cost=0.55..2.77 rows=1 width=14)
               Index Cond: (id = limited_jobs.id)

The index definition of the get_delayed_jobs_index:

"get_delayed_jobs_index" btree (priority, run_at, id) WHERE queue::text = 'canvas_queue'::text AND locked_at IS NULL AND next_in_strand

maths22 avatar Feb 02 '23 16:02 maths22