airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Each task execution time depends on the total number of tasks defined (got worse in 2.4.0)

Open MatrixManAtYrService opened this issue 3 years ago • 0 comments

Apache Airflow version

2.4.0

What happened

Here's 10 dags with 200 tasks each:

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from datetime import datetime

@task
def some_task():
    return 1

def dag_factory(i, task_pairs):
    with DAG(dag_id = str(i), start_date=datetime(1970,1,1), schedule_interval=None) as dag:
        for j in range(task_pairs):
            PythonOperator(task_id=str(j), python_callable=lambda x: print(x + 1), op_args=[some_task()])
    return dag

for i in range(10):
    locals()[f"dag_{i}"] = dag_factory(i,100)

In 2.3.4, I'd see dag execution times of 1:11 and task logs like this:

[2022-09-21, 18:07:47 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', '0', 'some_task__43', 'manual__2022-09-21T18:07:17.118419+00:00', '--job-id', '84', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmp2vtzsb3m', '--error-file', '/tmp/tmp_8hsx4zi']
[2022-09-21, 18:07:47 UTC] {standard_task_runner.py:80} INFO - Job 84: Subtask some_task__43
[2022-09-21, 18:07:47 UTC] {task_command.py:371} INFO - Running <TaskInstance: 0.some_task__43 manual__2022-09-21T18:07:17.118419+00:00 [running]> on host airflow-worker-0.airflow-worker.slo-lkevabv.svc.cluster.local
[2022-09-21, 18:07:47 UTC] {taskinstance.py:1583} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=0
AIRFLOW_CTX_TASK_ID=some_task__43
AIRFLOW_CTX_EXECUTION_DATE=2022-09-21T18:07:17.118419+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-21T18:07:17.118419+00:00
[2022-09-21, 18:07:47 UTC] {python.py:173} INFO - Done. Returned value was: 1
[2022-09-21, 18:07:47 UTC] {taskinstance.py:1412} INFO - Marking task as SUCCESS. dag_id=0, task_id=some_task__43, execution_date=20220921T180717, start_date=20220921T180747, end_date=20220921T180747
[2022-09-21, 18:07:47 UTC] {local_task_job.py:156} INFO - Task exited with return code 0

In 2.4.0 I see dag execution times of 1:40 and task logs like this:

[2022-09-21, 19:11:24 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', '0', 'some_task__43', 'manual__2022-09-21T19:11:07.238726+00:00', '--job-id', '49', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmppw7q_95j']
[2022-09-21, 19:11:24 UTC] {standard_task_runner.py:83} INFO - Job 49: Subtask some_task__43
[2022-09-21, 19:11:24 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/dag.py
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task>, 0 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 0>, some_task already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task>, 0 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 0>, some_task already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task>, 0 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 0>, some_task already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__1>, 1 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 1>, some_task__1 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__1>, 1 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 1>, some_task__1 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__1>, 1 already registered for DAG: 0
[2022-09-21, 19:11:24 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 1>, some_task__1 already registered for DAG: 0

...many lines omitted...

[2022-09-21, 19:11:30 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 98>, some_task__98 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__99>, 99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 99>, some_task__99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__99>, 99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 99>, some_task__99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__99>, 99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 99>, some_task__99 already registered for DAG: 9
[2022-09-21, 19:11:31 UTC] {task_command.py:384} INFO - Running <TaskInstance: 0.some_task__43 manual__2022-09-21T19:11:07.238726+00:00 [running]> on host airflow-worker-0.airflow-worker.slo-lklomubj.svc.cluster.local
[2022-09-21, 19:11:31 UTC] {taskinstance.py:1592} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=0
AIRFLOW_CTX_TASK_ID=some_task__43
AIRFLOW_CTX_EXECUTION_DATE=2022-09-21T19:11:07.238726+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-21T19:11:07.238726+00:00
[2022-09-21, 19:11:31 UTC] {python.py:177} INFO - Done. Returned value was: 1
[2022-09-21, 19:11:31 UTC] {taskinstance.py:1406} INFO - Marking task as SUCCESS. dag_id=0, task_id=some_task__43, execution_date=20220921T191107, start_date=20220921T191124, end_date=20220921T191131
[2022-09-21, 19:11:31 UTC] {local_task_job.py:164} INFO - Task exited with return code 0

If we double the load, we see a ~3.5x increase in task execution time.

for i in range(10):
    # locals()[f"dag_{i}"] = dag_factory(i,100)  # 10 * 2 * 100 = 2000 tasks
    locals()[f"dag_{i}"] = dag_factory(i,200)  # 10 * 2 * 200 = 4000 tasks 

Also, increasing the version from 2.3.4 to 2.4.0 shows a ~1.4x increase in task execution time

tasks 2.3.4 task execution time 2.4.0 task execution time increase
200 1:11 1:40 1.40x
400 4:14 5:42 1.34x
increase 3.58x 3.42x

Increasing the load to 1000 caused dagbag timeouts in the middle of the warning log:

[2022-09-21, 19:00:03 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 262>, some_task__262 already registered for DAG: 3
[2022-09-21, 19:00:03 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): some_task__262>, 262 already registered for DAG: 3
[2022-09-21, 19:00:03 UTC] {taskmixin.py:205} WARNING - Dependency <Task(PythonOperator): 262>, some_task__262 already registered for DAG: 3
[2022-09-21, 19:00:03 UTC] {timeout.py:68} ERROR - Process timed out, PID: 68
[2022-09-21, 19:00:03 UTC] {dagbag.py:330} ERROR - Failed to import: /opt/airflow/dags/dag.py
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 326, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag.py", line 17, in <module>
    locals()[f"dag_{i}"] = dag_factory(i,1000)
  File "/opt/airflow/dags/dag.py", line 13, in dag_factory
    PythonOperator(task_id=str(j), python_callable=lambda x: print(x + 1), op_args=[some_task()])
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 415, in apply_defaults
    self.set_xcomargs_dependencies()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1106, in set_xcomargs_dependencies
    from airflow.models.xcom_arg import XComArg
  File "<frozen importlib._bootstrap>", line 1009, in _handle_fromlist
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.4.0/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.4.0/best-practices.html#reducing-dag-complexity, PID: 68
[2022-09-21, 19:00:03 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 5 for task some_task__108 (Dag '0' could not be found; either it does not exist or it failed to parse.; 68)

So it looks like increasing the version from 2.3.4 to 2.4.0 slowed things down linearly.

What you think should happen instead

The content of these warnings is less worrisome to me than the fact that I see warnings about Dag9Task99 in the logs for Dag1Task1. If we really must reserialize the dag file every time we run a task, can we at least short circuit checks that don't pertain to the task at hand?

Better still would be to handle whatever this is just one time instead of on every task. I'm guessing that's what's slowing things down. 2.4.0 didn't really cause it, it just made it slightly worse and much more visible.

How to reproduce

run dag 0, keep track of the time. redeploy with a heavier load, do it again. Notice the volume of dependency warnings.

Operating System

Docker Desktop (mac os)

Versions of Apache Airflow Providers

n/a

Deployment

Official Apache Airflow Helm Chart

Deployment details

Using kubernetes in docker desktop

Dockerfile:

FROM docker.io/apache/airflow:2.4.0
COPY dags dags

deploy.sh:

#!/usr/bin/env bash
set -ex
TAG=$(openssl rand -base64 6 | tr -d '+/' | tr '[:upper:]' '[:lower:]')
NS="slo-$TAG"
IMG="img:$TAG"
docker build . -t $IMG
helm repo add apache-airflow https://airflow.apache.org
helm repo update
kubectl create namespace $NS
cat <<- 'EOF' | \
sed "s/TAG/$TAG/" | \
helm install airflow \
--namespace $NS \
apache-airflow/airflow \
-f -
defaultAirflowRepository: img
defaultAirflowTag: TAG
images:
  airflow:
    repository: img
logs:
  persistence:
    enabled: true
    size: 2Gi
EOF
set +ex

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

MatrixManAtYrService avatar Sep 21 '22 19:09 MatrixManAtYrService