airflow icon indicating copy to clipboard operation
airflow copied to clipboard

MySQL Not Using Correct Index for Scheduler Critical Section Query

Open michaelmicheal opened this issue 2 years ago • 3 comments

Apache Airflow version

Other Airflow 2 version

What happened

Airflow Version: 2.2.5 MySQL Version: 8.0.18

In the Scheduler, we are coming across instances where MySQL is inefficiently optimizing the critical section task queuing query. When a large number of task instances are scheduled, MySQL failing to use the ti_state index to filter the task_instance table, resulting in a full table scan (about 7.3 million rows).

Normally, when running the critical section query the index on task_instance.state is used to filter scheduled task_instances.

| -> Limit: 512 row(s)  (actual time=5.290..5.413 rows=205 loops=1)
    -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=5.289..5.391 rows=205 loops=1)
        -> Table scan on <temporary>  (actual time=0.003..0.113 rows=205 loops=1)
            -> Temporary table  (actual time=5.107..5.236 rows=205 loops=1)
                -> Nested loop inner join  (cost=20251.99 rows=1741) (actual time=0.100..4.242 rows=205 loops=1)
                    -> Nested loop inner join  (cost=161.70 rows=12) (actual time=0.071..2.436 rows=205 loops=1)
                        -> Index lookup on task_instance using ti_state (state='scheduled')  (cost=80.85 rows=231) (actual time=0.051..1.992 rows=222 loops=1)
                        -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.002..0.002 rows=1 loops=222)
                            -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.001..0.001 rows=1 loops=222)
                    -> Filter: ((dag.is_paused = 0) and (task_instance.dag_id = dag.dag_id))  (cost=233.52 rows=151) (actual time=0.008..0.008 rows=1 loops=205)
                        -> Index range scan on dag (re-planned for each iteration)  (cost=233.52 rows=15072) (actual time=0.008..0.008 rows=1 loops=205)
1 row in set, 1 warning (0.03 sec)

When a large number of task_instances are in scheduled state at the same time, the index on task_instance.state is not being used to filter scheduled task_instances.

| -> Limit: 512 row(s)  (actual time=12110.251..12110.573 rows=512 loops=1)
    -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=12110.250..12110.526 rows=512 loops=1)
        -> Table scan on <temporary>  (actual time=0.005..0.800 rows=1176 loops=1)
            -> Temporary table  (actual time=12109.022..12109.940 rows=1176 loops=1)
                -> Nested loop inner join  (cost=10807.83 rows=3) (actual time=1.328..12097.528 rows=1176 loops=1)
                    -> Nested loop inner join  (cost=10785.34 rows=64) (actual time=1.293..12084.371 rows=1193 loops=1)
                        -> Filter: (dag.is_paused = 0)  (cost=1371.40 rows=1285) (actual time=0.087..22.409 rows=13264 loops=1)
                            -> Table scan on dag  (cost=1371.40 rows=12854) (actual time=0.085..15.796 rows=13508 loops=1)
                        -> Filter: ((task_instance.state = 'scheduled') and (task_instance.dag_id = dag.dag_id))  (cost=0.32 rows=0) (actual time=0.907..0.909 rows=0 loops=13264)
                            -> Index lookup on task_instance using PRIMARY (dag_id=dag.dag_id)  (cost=0.32 rows=70) (actual time=0.009..0.845 rows=553 loops=13264)
                    -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.010..0.011 rows=1 loops=1193)
                        -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.009..0.010 rows=1 loops=1193)

1 row in set, 1 warning (12.14 sec)

What you think should happen instead

To resolve this, I added a patch on the scheduler_job.py file, adding a MySQL index hint to use the ti_state index.

--- /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
+++ /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
@@ -293,6 +293,7 @@ class SchedulerJob(BaseJob):
             # and the dag is not paused
             query = (
                 session.query(TI)
+                .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
                 .join(TI.dag_run)
                 .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
                 .join(TI.dag_model)

I think it makes sense to add this index hint upstream.

How to reproduce

Schedule a large number of dag runs and tasks in a short period of time.

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

Airflow 2.2.5 on Kubernetes MySQL Version: 8.0.18

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

michaelmicheal avatar Aug 09 '22 19:08 michaelmicheal

Is this specific to MySQL?

uranusjr avatar Aug 10 '22 06:08 uranusjr

Yes it is. @michaelmicheal can you update the title to make it clearer we are talking about MySQL.

The questions for the airflow maintainers, would you welcome a PR to add the index hint when using mysql?

# Pseudo code
if mysql:
   query = query.with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')

j-martin avatar Aug 10 '22 22:08 j-martin

I don’t think you need the if mysql part (just adding the hint globally and the dialect_name part would make it only apply for MySQL), but yes a PR would be welcomed.

uranusjr avatar Aug 11 '22 01:08 uranusjr

Adding the hint globally and the dialect_name part would make it only apply for MySQL

Yes, you are right!

j-martin avatar Aug 11 '22 20:08 j-martin