Grid view optimization
Work in progress branch for grid optimization
I break up the monolith that is grid_data.
The headline here is, with 3k tasks in a dag, loading time for 10 runs drops from 1.5m to < 10s in a quick local test.
I split it into smaller more purpose-specific requests that each do less. So we have one request for just the structure, and another one for TI states (per dag run). I also find ways to stop refreshing when there's no active dag run (or the particuler dag run is not active and its tis don't need refreshing. I also changed the "latest dag run" query (which checks for a new run triggered externally to be simpler dedicated endpoint. It runs ever couple seconds even when there is nothing going on and now it takes 10ms instead of 300ms.
In order to have the grid/structure request stop refreshing when nothing is active i had to add a new context provider so the state could be propagated from the grid/runs request (which knows if there's active runs). There may be a better way to do this, and the linter may not like it.
Here's an dag you can test this with. Before, if you had 10 runs, it would take 1.5 minutes to load. When I tried, it was 6 seconds. It's faster with non-dev mode than with dev mode presumably because of parallelism.
from __future__ import annotations
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, TaskGroup, chain
with DAG("bighello_deeper_only_normal"):
for i in range(10):
with TaskGroup(f"group_{i}"):
EmptyOperator(task_id="hello")
with TaskGroup(f"group_{i}2"):
chain([EmptyOperator(task_id=f"empty_{j}") for j in range(100)])
chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(100)])
# EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(100)))
with TaskGroup(f"group_{i}3"):
chain([EmptyOperator(task_id=f"empty_{j}") for j in range(100)])
chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(100)])
# EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(100)))
with TaskGroup(f"group_{i}4"):
chain([EmptyOperator(task_id=f"empty_{j}") for j in range(100)])
chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(100)])
# EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(100)))
before (1.6m)
after (6s)
in action
https://github.com/user-attachments/assets/54e42f62-57a2-46e4-87f2-262419a16ab9