airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamically Mapped Tasks: DB performance issues

Open VladimirYushkevich opened this issue 1 year ago • 10 comments
trafficstars

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We are running Airflow on Kubernetes (GCP) with a Postgres database (Cloud SQL). We are using pgbouncer as a DB connection pool. We have a single DAG in a separate Airflow worker pool that runs every hour and creates 1000+ Dynamically Mapped Tasks. As mentioned in https://github.com/apache/airflow/issues/35267#issuecomment-2113027901 upgrading to 2.9.1 helped to eliminate long-running transactions. However, it introduced another issue that we did not encounter in the previous version:

  • The Postgres instance started reporting many could not obtain lock on row in relation "dag_run" errors:
2024-05-17 09:49:19.191 UTC [3586765]: [131-1] db=airflow,[email protected] ERROR:  could not obtain lock on row in relation "dag_run"
  • We also noticed a significant spike in CPU: Screenshot 2024-05-17 at 12 32 12

What you think should happen instead?

No response

How to reproduce

Create dag with following tasks: Screenshot 2024-05-17 at 12 47 25

# DAG deginition
@dag(
    dag_id="retryable_dag"
    schedule="@hourly",
    start_date=pendulum.today("UTC").add(hours=-1),
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        "on_failure_callback": send_dag_failure_message_to_slack,
        "pool": "retryable_pool",
        "max_active_tis_per_dagrun": 50,
    },
)
def retryable_dag() -> DAG:
    dag_configs = PythonOperator(task_id="load_dag_configs", python_callable=list_files)

    process_dag_config.expand(source=dag_configs.output)


@task_group
def process_dag_config(source: str) -> None:
    config_file = extract_dag_config(source=source)
    trigger_dag_run(config_file=config_file)
    delete_dag_config(config_file=config_file)


def list_files() -> list[str]:
   gcs_hook = GCSHook(
            impersonation_chain="SA",
        )

   return gcs_hook.list(
            bucket_name=os.getenv(GCS_BUCKET),
            prefix=f"{os.getenv(GCS_REFIX, GCS_PATH)}/",
            match_glob="**/*.json",
        )


@task
def extract_dag_config...


@task
def trigger_dag_run...


@task
def delete_dag_config...

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.6.2 apache-airflow-providers-common-io==1.3.1 apache-airflow-providers-common-sql==1.12.0 apache-airflow-providers-datadog==3.5.1 apache-airflow-providers-fab==1.0.4 apache-airflow-providers-ftp==3.8.0 apache-airflow-providers-google==10.17.0 apache-airflow-providers-http==4.10.1 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-postgres==5.10.2 apache-airflow-providers-slack==8.6.2 apache-airflow-providers-smtp==1.6.1 apache-airflow-providers-sqlite==3.7.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

VladimirYushkevich avatar May 17 '24 10:05 VladimirYushkevich

The Postgres instance started reporting many could not obtain lock on row in relation "dag_run" errors:

This pretty fine, because Airflow tried to obtain row level lock by utilise statement SELECT .. FOR UPDATE SKIP LOCKED

Taragolis avatar May 17 '24 11:05 Taragolis

Correction: SELECT .. FOR UPDATE NOWAIT raise this error

Taragolis avatar May 17 '24 12:05 Taragolis

But it seems what has been added in https://github.com/apache/airflow/pull/38914. I'm not sure that ignoring this error can be considered a solution. If it always results in an ERROR when trying to obtain a lock, why do we even need to attempt the query?

VladimirYushkevich avatar May 17 '24 12:05 VladimirYushkevich

if it always results in an ERROR

Are you sure that it is always return an error?

Taragolis avatar May 17 '24 12:05 Taragolis

I'm not sure. But then should be kind of retry mechanism(I'm also not sure if we have it). Anyway I just see a lot of errors in DB and my first guess: something is not correct with transaction level.

VladimirYushkevich avatar May 17 '24 12:05 VladimirYushkevich

This one use in mini scheduler mechanism, which might fail and it is fine and by design, this just an optimisation mechanism. It might also tried to work concurrently, so the idea (as I could understand) it try to obtain row lock level lock and if it failed (EAFP) than it fine because other mini-scheduler already care about it.

Taragolis avatar May 17 '24 12:05 Taragolis

As measure might be use skip_locked instead of no_wait both of them non blocking, with differences how it handled internally in DB backend:

  • SKIP LOCKED: return non locked records
  • NO WAIT: raise an error

Taragolis avatar May 17 '24 13:05 Taragolis

As measure might be use skip_locked instead of no_wait both of them non blocking, with differences how it handled internally in DB backend:

  • SKIP LOCKED: return non locked records
  • NO WAIT: raise an error

yeah, would be great to avoid DB internal error

VladimirYushkevich avatar May 17 '24 13:05 VladimirYushkevich

yeah, would be great to avoid DB internal error

Feel free to fix it. Apache Airflow it is open source project, and every one could contribute changes (fixes/features) back, especially if they know what then outcome achievement/benefits of the changes.

So I would recommend try to patch this part into your side and check is it still work as expected, and without additional error logs in DB backend and without side effects.

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f154461a77..a08b9cd94a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3454,8 +3454,12 @@ class TaskInstance(Base, LoggingMixin):
                     run_id=ti.run_id,
                 ),
                 session=session,
-                nowait=True,
-            ).one()
+                skip_locked=True,
+            ).one_or_none()
+            if not dag_run:
+                # Need to log something?
+                session.rollback()
+                return
 
             task = ti.task
             if TYPE_CHECKING:

Taragolis avatar May 17 '24 13:05 Taragolis

thanks @Taragolis, I will try with my first PR)

VladimirYushkevich avatar May 18 '24 10:05 VladimirYushkevich

@VladimirYushkevich thanks for the PR, I am seeing the same errors since updating to 2.9.1 when running mapped tasks on a LocalExecutor.

seeholza avatar May 22 '24 09:05 seeholza

I tested the image from PR on our environment and this error disappeared.

VladimirYushkevich avatar May 22 '24 09:05 VladimirYushkevich

@Taragolis, do I need to ask for the review explicitly? I believe the PR is ready from my side, but maybe I'm missing something.

VladimirYushkevich avatar May 23 '24 13:05 VladimirYushkevich

I've added couple additional reviewers which might more familiar with the mini scheduler. But it might take a time - all review happen by people on their free time, and some reviews require a bit more time that others.

You could also ask a round in #new-contributors or #contributors (pick one channel) in Apache Airflow Community Slack Workspace

Taragolis avatar May 23 '24 13:05 Taragolis

Does https://github.com/apache/airflow/pull/39745 solves this issue or are there additional tasks?

eladkal avatar May 29 '24 05:05 eladkal

Does https://github.com/apache/airflow/pull/39745 solves this issue or are there additional tasks?

It does, we can close it.

VladimirYushkevich avatar May 29 '24 06:05 VladimirYushkevich

related

https://github.com/apache/airflow/pull/38914 https://github.com/apache/airflow/pull/39745

dstandish avatar Sep 03 '24 16:09 dstandish