Optimise DB worker queries
The initial DB worker (#3) polls for tasks every second (by default, configurable). These queries (and by extension, the underlying table) aren't especially optimised. It would be great if they could be. The queries themselves ought to be ok, however adding strategic indexing should help with performance.
You should perhaps consider making the table for active (i.e. NEW/RUNNING) tasks a separate table from that used for inactive (FAILED/COMPLETE) tasks. Indexing can only get you so far, and it's hard to beat the simplicity of keeping the short, frequently-accessed list of active tasks separate from the long, infrequently-accessed list of inactive tasks.
Another idea could be to automatically purge failed/complete tasks after a period that the user can define in a setting. Of course, this is not possible in all use cases and can not be considered as an all-in-one solution.
Another idea could be to automatically purge failed/complete tasks after a period that the user can define in a setting. Of course, this is not possible in all use cases and can not be considered as an all-in-one solution.
That's discussed in #16. That has to be implemented because otherwise you're basically making a never-rotated log which will just continue to use more and more storage space forever. But it's not great to have competing requirements ("I want to keep task results for a long time" vs "I want the system to be fast") pulling you in opposite directions on the same setting.
I think "I want to keep task results for a long time" is an anti-pattern. The result should be short-lived, and persisted to somewhere else (ie your business logic) in all cases.
Reducing the size of the table absolutely helps with performance, but database engines are well optimised for scans on large tables - far larger than this table is likely to hit. Indexing will help partition said scans down quite a lot, achieving something very similar to partitioning the table. I think a hard partition is far more effort than it's worth.
Just from looking at the QuerySet I'd say the filtering mostly happens on the status field, so adding an index there seems quite obvious.
run_after is also filtered when using the ready() method. It could be added as a second field to the index. I just wonder how much it's worth it. How many tasks do we expect to be new but not ready? Maybe hundreds? Even thousands would probably be fine and maybe wouldn't even use the index. On the other hand, disk space is cheap, so maybe it'd be worthwhile.
If there's some agreement I could prepare a PR.
That sounds like the ideal start to me. I'm not sure what the query plan would look like for combined indexes vs separate ones, but it's hard to know that without a proper scale of data.
The queue_name is also used quite often for the worker - perhaps it'd be useful to have an index there?
I will try to find some time to have a look into it and throw some data in and see when they get used.
Okay I made some time to look into this.
I mostly looked at DBTaskResult.objects.ready() as this is I think what will benefit most from the indexes. The others only filter on status so the index should also work well for those. I also checked .complete() because I made a million of those and wanted to check out how well the ordering works for that. Probably there's not much reason to get all rows, I imagine it'd be paginated. But in the admin, counts are still slow and pagination works such that looking at high page numbers is also very slow, so I thought it worth a look.
I tried at first with SQLite but the explain output wasn't useful enough, so I switched to PostgreSQL. But from my initial tests I would say the following mostly holds for both SQLite and Postgres.
I set up the test data with the following command. I'm not sure how realistic this data is, but it seemed "reasonable" to me. If we clean up this database often enough that we don't have millions of rows in it, probably none of this is necessary.
import datetime
from django.core.management import BaseCommand
from django.utils import timezone
from django_tasks.backends.database.models import DBTaskResult
class Command(BaseCommand):
def handle(self, *args, **kwargs):
now = timezone.now()
one_day = datetime.timedelta(days=1)
earlier = now - one_day
later = now + one_day
for _ in range(1_000_000):
DBTaskResult.objects.create(status="COMPLETE", args_kwargs="")
for _ in range(100_000):
DBTaskResult.objects.create(status="FAILED", args_kwargs="")
for _ in range(10_000):
DBTaskResult.objects.create(status="NEW", run_after=later, args_kwargs="")
for _ in range(1_000):
DBTaskResult.objects.create(status="NEW", run_after=earlier, args_kwargs="")
DBTaskResult.objects.create(status="NEW", args_kwargs="")
DBTaskResult.objects.create(status="RUNNING", args_kwargs="")
Explain for DBTaskResult.objects.ready() without changes:
Sort (cost=21569.78..21569.79 rows=1 width=143) (actual time=83.894..84.912 rows=2000 loops=1)
Sort Key: priority DESC, run_after DESC NULLS LAST
Sort Method: quicksort Memory: 330kB
-> Gather (cost=1000.00..21569.77 rows=1 width=143) (actual time=83.054..84.462 rows=2000 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Seq Scan on django_tasks_database_dbtaskresult (cost=0.00..20569.67 rows=1 width=143) (actual time=61.924..62.123 rows=667 loops=3)
Filter: (((run_after IS NULL) OR (run_after <= '2024-10-07 10:42:49.646184+00'::timestamp with time zone)) AND ((status)::text = 'NEW'::text))
Rows Removed by Filter: 370333
Planning Time: 0.432 ms
Execution Time: 84.986 ms
Sequential scan is not amazing to see, and is reflected in the fairly slow speed.
DBTaskResult.objects.complete():
Gather Merge (cost=125734.95..229278.01 rows=887450 width=143) (actual time=188.961..507.089 rows=1000000 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Sort (cost=124734.93..125844.24 rows=443725 width=143) (actual time=180.958..206.857 rows=333333 loops=3)
Sort Key: priority DESC, run_after DESC NULLS LAST
Sort Method: external merge Disk: 28816kB
Worker 0: Sort Method: external merge Disk: 26560kB
Worker 1: Sort Method: external merge Disk: 27096kB
-> Parallel Seq Scan on django_tasks_database_dbtaskresult (cost=0.00..19415.06 rows=443725 width=143) (actual time=0.019..60.951 rows=333333 loops=3)
Filter: ((status)::text = 'COMPLETE'::text)
Rows Removed by Filter: 37667
Planning Time: 0.107 ms
Execution Time: 533.226 ms
The on-disk sort here is also not wonderful, and this is very slow, as you'd expect. Putting a limit on avoids the on-disk merge, but is still slow (around 100ms), which is probably what you'd see on e.g. first page of paginated results.
DBTaskResult.objects.ready() with indexes = [models.Index(fields=["status"])]:
Sort (cost=4.46..4.46 rows=1 width=143) (actual time=4.328..4.548 rows=2000 loops=1)
Sort Key: priority DESC, run_after DESC NULLS LAST
Sort Method: quicksort Memory: 330kB
-> Index Scan using django_task_status_9e2de0_idx on django_tasks_database_dbtaskresult (cost=0.43..4.45 rows=1 width=143) (actual time=2.639..3.365 rows=2000 loops=1)
Index Cond: ((status)::text = 'NEW'::text)
Filter: ((run_after IS NULL) OR (run_after <= '2024-10-07 10:51:33.499874+00'::timestamp with time zone))
Rows Removed by Filter: 10000
Planning Time: 0.147 ms
Execution Time: 4.664 ms
Much better. It doesn't seem to help complete() though.
With indexes = [models.Index(fields=["status", "run_after"])]:
Sort (cost=6.21..6.21 rows=1 width=143) (actual time=5.052..5.165 rows=2000 loops=1)
Sort Key: priority DESC, run_after DESC NULLS LAST
Sort Method: quicksort Memory: 330kB
-> Index Scan using django_task_status_845397_idx on django_tasks_database_dbtaskresult (cost=0.43..6.20 rows=1 width=143) (actual time=0.069..4.197 rows=2000 loops=1)
Index Cond: ((status)::text = 'NEW'::text)
Filter: ((run_after IS NULL) OR (run_after <= '2024-10-07 10:53:13.943714+00'::timestamp with time zone))
Rows Removed by Filter: 10000
Planning Time: 0.133 ms
Execution Time: 5.391 ms
Didn't seem to get used, and slows down the query slightly if anything. Probably after the first filter, there's few enough rows that it's not needed. With different data this could be faster but I'm not sure how many ready or scheduled tasks we expect to see. complete() is still not helped.
Adding a new index just for the ordering, so we end up with the following does work very nicely though, without impacting the other queries.
indexes = [
models.Index(fields=["status"),
models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]
With explain:
Index Scan using idx_task_ordering on django_tasks_database_dbtaskresult (cost=0.43..55058.56 rows=1073019 width=143) (actual time=2.926..187.273 rows=1003836 loops=1)
Filter: ((status)::text = 'COMPLETE'::text)
Rows Removed by Filter: 113000
Planning Time: 0.477 ms
Execution Time: 213.439 ms
So unless you have more details about how many rows you'd expect of which type, I think this is quite reasonable.
I also didn't check queue_name (yet!). I think this should be added to the manager, because part of me not checking it was just that I didn't notice it.
I suspect that it is only really used in combination with ready(), which is already indexed quite well. So as long as you don't have tens of thousands of tasks per queue, it might be unnecessary. On the other hand, adding this (either before or after, but I suspect after would be better for admin filtering, unless people rather filter by queue than state... 🤔) to the existing status index might be useful at a large scale. If you have some numbers in mind @RealOrangeOne, I'd be happy to test them.
This is fantastic!
An index on status makes sense, since most of the time that's going to cut down the number of rows quite a lot. queue_name ought to help for similar reasons. run_after and priority might need indexing too, although I'm not sure what the behaviour would be like on looking through 2 indexes vs having 1 main "find the task to execute" index which is specially designed.
I don't think this necessarily needs to be right first time. Getting the more obvious changes in first, to get most of the benefit, then the indexes can be tuned over time. You're right that cleanup should mean the table doesn't get too big, but given it's polled, any query improvements will help.
I ran some more sxperiments.
I added the same amount of data agin but with a different queue name. So there are now double the number of objects, half in one queue, half in the other.
- I could never get any index using
run_afterto work. The most likely reason is that thestatuspart of the filter is more than enough already. If there are a lot of tasks in the new state, in the order of hundreds of K or more, maybe it would be useful to concatneate on to thestatusindex. - I tried various forms of concatenated indexes. The only useful one was for the ordering.
- A queue_name index seems to not be used, again because the status index is good enough. It still seems useful to help with admin filtering though, and I suspect in some setups this could be more useful than the status index, so it seems useful to keep in case the database chooses to use it, and for the admin.
So I would suggest either:
indexes = [
models.Index(fields=["status"]),
models.Index(fields=["queue_name"]),
models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]
Or if we think it's likely to be used in some setups:
indexes = [
models.Index(fields=["status", "run_after"]),
models.Index(fields=["queue_name"]),
models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]
I will prepare a PR with the first option I think, we can maybe discuss more once there's an implementation to look at.
@RealOrangeOne I think the model default_ordering should be run_after ascending, rather than descending - you'd want the oldest date first, right?
Yes that does sound sensible. I'd definitely welcome a PR to fix it. You've put it your index one but the fix should be separate.
@eviljeff Looked further into this, the current functionality is exactly how you describe - ascending will give the oldest date first, which is correct and how it's implemented now.
@eviljeff Looked further into this, the current functionality is exactly how you describe - ascending will give the oldest date first, which is correct and how it's implemented now.
oh, that's good. Where can I see that? https://github.com/RealOrangeOne/django-tasks/blob/master/django_tasks/backends/database/models.py#L111 reads to me as descending - is it overridden somewhere else?