dagster
dagster copied to clipboard
Unable to load schedules from Dagit due to timeout
Dagster version
1.5.6
What's the issue?
We get the following error when trying to view some schedules:
Operation name: SingleScheduleQuery
Message: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
[SQL: SELECT runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time
FROM runs JOIN (SELECT run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags) AS run_tags_subquery_1 ON runs.run_id = run_tags_subquery_1.run_id AND run_tags_subquery_1.key = %(key_1)s AND run_tags_subquery_1.value = %(value_1)s JOIN (SELECT run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags) AS run_tags_subquery_2 ON runs.run_id = run_tags_subquery_2.run_id AND run_tags_subquery_2.key = %(key_2)s AND run_tags_subquery_2.value = %(value_2)s ORDER BY runs.id DESC
LIMIT %(param_1)s]
Path: ["scheduleOrError","scheduleState","runs"]
Here is the query plan:
Limit (cost=7902.91..28782.70 rows=1 width=1079) (actual time=56399.979..56399.984 rows=1 loops=1)
-> Nested Loop (cost=7902.91..4684974.44 rows=224 width=1079) (actual time=56399.978..56399.982 rows=1 loops=1)
Join Filter: ((run_tags.run_id)::text = (runs.run_id)::text)
Rows Removed by Join Filter: 700022697
-> Index Scan Backward using runs_pkey on runs (cost=0.42..417112.91 rows=571880 width=1116) (actual time=0.033..74.907 rows=69159 loops=1)
-> Materialize (cost=7902.49..98857.55 rows=486 width=74) (actual time=0.001..0.290 rows=10122 loops=69159)
-> Hash Join (cost=7902.49..98855.12 rows=486 width=74) (actual time=33.983..190.339 rows=10122 loops=1)
Hash Cond: ((run_tags_1.run_id)::text = (run_tags.run_id)::text)
-> Bitmap Heap Scan on run_tags run_tags_1 (cost=1801.03..91423.11 rows=75754 width=37) (actual time=20.664..121.352 rows=434445 loops=1)
Recheck Cond: ((key = '.dagster/repository'::text) AND (value = '[email protected]'::text))
Heap Blocks: exact=51198
-> Bitmap Index Scan on idx_run_tags (cost=0.00..1782.10 rows=75754 width=0) (actual time=13.055..13.055 rows=434445 loops=1)
Index Cond: ((key = '.dagster/repository'::text) AND (value = '[email protected]'::text))
-> Hash (cost=6080.29..6080.29 rows=1693 width=37) (actual time=13.277..13.279 rows=10122 loops=1)
Buckets: 16384 (originally 2048) Batches: 1 (originally 1) Memory Usage: 811kB
-> Bitmap Heap Scan on run_tags (cost=41.91..6080.29 rows=1693 width=37) (actual time=2.496..11.411 rows=10122 loops=1)
Recheck Cond: ((key = 'dagster/schedule_name'::text) AND (value = 'scheduler_name'::text))
Heap Blocks: exact=9659
-> Bitmap Index Scan on idx_run_tags (cost=0.00..41.48 rows=1693 width=0) (actual time=1.276..1.276 rows=10122 loops=1)
Index Cond: ((key = 'dagster/schedule_name'::text) AND (value = 'scheduler_name'::text))
Planning Time: 0.482 ms
Execution Time: 56400.581 ms
(22 rows)
What did you expect to happen?
The schedule to load without a timeout error.
How to reproduce?
I don't have consistent steps to reproduce the issue but when it starts happening it happens consistently until I delete the schedule's runs from the database.
Deployment type
Local
Deployment details
Using postgres RDS database
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
cc @prha any ideas? Or is this a known issue that's been fixed in later releases?
Double join seems to be the slowest part:
-> Materialize (cost=7902.49..98857.55 rows=486 width=74) (actual time=0.001..0.290 rows=10122 loops=69159)
it's somewhat fast for a single row but scales linearly with each additional run.
I haven't checked the source code to see how the query is constructed, I imagine it's something you have control over.
I think the joins can be optimized such that postgres takes a more reasonable query path.
First pass attempt:
with first_join as (
SELECT runs.id as first_join_id
FROM runs
JOIN (
SELECT
run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags
) AS run_tags_subquery_1
ON
runs.run_id = run_tags_subquery_1.run_id
AND run_tags_subquery_1.key = 'dagster/schedule_name'
AND run_tags_subquery_1.value = '<schedule_name>'
)
SELECT
runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time
FROM runs
JOIN (
SELECT
run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags
) AS run_tags_subquery_2
ON
runs.run_id = run_tags_subquery_2.run_id
AND run_tags_subquery_2.key = '.dagster/repository'
AND run_tags_subquery_2.value = '<repository>'
JOIN first_join on first_join.first_join_id = runs.id
ORDER BY runs.id DESC
LIMIT 1
Anecdotally, this reduces the query time for my schedule from ~60s to ~200ms.
Here's the updated query path for the query above:
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=16876.01..30336.38 rows=1 width=1079) (actual time=183.982..185.832 rows=1 loops=1)
-> Nested Loop (cost=16876.01..3153142.15 rows=233 width=1079) (actual time=183.981..185.830 rows=1 loops=1)
Join Filter: ((runs.run_id)::text = (run_tags.run_id)::text)
Rows Removed by Join Filter: 244164
-> Nested Loop (cost=15079.80..17271.65 rows=1727 width=1116) (actual time=39.990..41.838 rows=1 loops=1)
-> Gather Merge (cost=15079.37..15276.20 rows=1727 width=4) (actual time=39.958..41.804 rows=1 loops=1)
Workers Planned: 1
Workers Launched: 1
-> Sort (cost=14079.36..14081.90 rows=1016 width=4) (actual time=38.235..38.292 rows=1024 loops=2)
Sort Key: runs_1.id DESC
Sort Method: quicksort Memory: 431kB
Worker 0: Sort Method: quicksort Memory: 429kB
-> Nested Loop (cost=42.68..14028.62 rows=1016 width=4) (actual time=1.442..37.284 rows=5061 loops=2)
-> Parallel Bitmap Heap Scan on run_tags run_tags_1 (cost=42.26..6185.24 rows=1016 width=37) (actual time=1.422..9.307 rows=5061 loops=2)
Recheck Cond: ((key = 'dagster/schedule_name'::text) AND (value = '<schedule_name>'::text))
Heap Blocks: exact=4837
-> Bitmap Index Scan on idx_run_tags (cost=0.00..41.82 rows=1727 width=0) (actual time=1.625..1.626 rows=10122 loops=1)
Index Cond: ((key = 'dagster/schedule_name'::text) AND (value = '<schedule_name>'::text))
-> Index Scan using runs_run_id_key on runs runs_1 (cost=0.42..7.72 rows=1 width=41) (actual time=0.005..0.005 rows=1 loops=10122)
Index Cond: ((run_id)::text = (run_tags_1.run_id)::text)
-> Index Scan using runs_pkey on runs (cost=0.42..1.16 rows=1 width=1116) (actual time=0.028..0.028 rows=1 loops=1)
Index Cond: (id = runs_1.id)
-> Materialize (cost=1796.21..92786.91 rows=77235 width=37) (actual time=20.020..130.225 rows=244165 loops=1)
-> Bitmap Heap Scan on run_tags (cost=1796.21..91796.74 rows=77235 width=37) (actual time=20.015..75.439 rows=244165 loops=1)
Recheck Cond: ((key = '.dagster/repository'::text) AND (value = '<repository>'::text))
Heap Blocks: exact=27909
-> Bitmap Index Scan on idx_run_tags (cost=0.00..1776.90 rows=77235 width=0) (actual time=12.514..12.514 rows=434542 loops=1)
Index Cond: ((key = '.dagster/repository'::text) AND (value = '<repository>'::text))
Planning Time: 0.621 ms
Execution Time: 188.012 ms
(30 rows)
One more report over here is about the exact same issue on the same query. In our case, this happens when opening a Single sensor page with many runs. I believe we found the culprit.
Our query looks like this:
SELECT runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time
FROM runs
JOIN (SELECT run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags ) AS run_tags_subquery_1 ON runs.run_id = run_tags_subquery_1.run_id AND run_tags_subquery_1.key = 'dagster/sensor_name' AND run_tags_subquery_1.value = 'process_pneo_sen_sensor'
JOIN (SELECT run_tags.id AS id, run_tags.run_id AS run_id, run_tags.key AS key, run_tags.value AS value
FROM run_tags) AS run_tags_subquery_2 ON runs.run_id = run_tags_subquery_2.run_id AND run_tags_subquery_2.key = '.dagster/repository' AND run_tags_subquery_2.value = 'data_ingestion@data-ingestion'
ORDER BY runs.id DESC
LIMIT 1
We looked at the query and immediately pointed our finger at the double-join, but that ended up not being the culprit. The actual culprit (surprisingly) seems to be LIMIT 1
. It looks like in certain scenarios, LIMIT 1
can make the query slower (full explanation: https://stackoverflow.com/questions/21385555/postgresql-query-very-slow-with-limit-1/ ), and that's what is happening in our case.
The query above without LIMITS 1
takes 300ms on our database. Adding LIMIT 1
pushes it to over a minute (we never let it run to completion since it takes forever).
@smackesey @prha - Any feedback on these findings?
Hey folks we think we made some progress on this issue. This is what we tried so far:
Cleaning up PG database
We run vacuum analyze
against all the tables of our database.
That helped a lot. The majority of the query which were timing out are now working fine.
Sadly some of them are still failing with the same error.
Adding an extra index
We experimented by adding an extra index in the run_tags
table:
CREATE UNIQUE INDEX run_tags_run_idx ON public.run_tags USING btree (run_id, id)
This is because the run_id field is used in the join conditions so having an index could help (at the price of a big index size) In our case, this has helped consistently with all the slow queries we had.
As of now we have decided to go for solution 1 but we'd like your feedback about adding an extra index and in case we can create a PR
I think one big original sin with the run tags join is that the join column is on run.run_id
instead of run.id
(primary key). I think probably we'll want to do some sort of schema + data migration to start including a column like run_storage_id
on the run tags table and modifying the read queries to make use of it. This adds a ton of complexity into the storage layer because we have to handle all permutations of schema/data migration state (we don't force OSS users to apply migrations from release to release).
Adding this (run_id, id) index is a good stop gap, but I'm not sure yet if we want to commit to that as the final end state we want to get to.
I'll start looking at some benchmarks to see if this helps across the board on all the run tag joins.
we don't force OSS users to apply migrations from release to release
Why not though? It's the first time I see this type of strategy and I fully understand that it can massively increase the complexity of the backend. I tried to bring up a related topic here some time ago.
I don't think anyone would be against applying all the migrations needed for Dagster to run at its best. That's pretty much the default out there. As time passes, the number of possible permutations will only increase.
I think people are generally on board with applying schema migrations. This in particular would require a data migration to populate a new column to join on, which may take a long amount of time to complete. I understand that we're erring on the side of caution here, but we want to be careful about when we enforce these constraints on the upgrade path.
We might choose to take on this migration complexity in dot releases and then do migration enforcement with major releases, but I'd like to figure out a more standard policy on this before committing.