airflow
airflow copied to clipboard
Dynamically Mapped Tasks: DB performance issues
Apache Airflow version
2.9.1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
We are running Airflow on Kubernetes (GCP) with a Postgres database (Cloud SQL). We are using pgbouncer as a DB connection pool. We have a single DAG in a separate Airflow worker pool that runs every hour and creates 1000+ Dynamically Mapped Tasks. As mentioned in https://github.com/apache/airflow/issues/35267#issuecomment-2113027901 upgrading to 2.9.1 helped to eliminate long-running transactions. However, it introduced another issue that we did not encounter in the previous version:
- The Postgres instance started reporting many
could not obtain lock on row in relation "dag_run"errors:
2024-05-17 09:49:19.191 UTC [3586765]: [131-1] db=airflow,[email protected] ERROR: could not obtain lock on row in relation "dag_run"
- We also noticed a significant spike in CPU:
What you think should happen instead?
No response
How to reproduce
Create dag with following tasks:
# DAG deginition
@dag(
dag_id="retryable_dag"
schedule="@hourly",
start_date=pendulum.today("UTC").add(hours=-1),
is_paused_upon_creation=False,
max_active_runs=1,
default_args={
"on_failure_callback": send_dag_failure_message_to_slack,
"pool": "retryable_pool",
"max_active_tis_per_dagrun": 50,
},
)
def retryable_dag() -> DAG:
dag_configs = PythonOperator(task_id="load_dag_configs", python_callable=list_files)
process_dag_config.expand(source=dag_configs.output)
@task_group
def process_dag_config(source: str) -> None:
config_file = extract_dag_config(source=source)
trigger_dag_run(config_file=config_file)
delete_dag_config(config_file=config_file)
def list_files() -> list[str]:
gcs_hook = GCSHook(
impersonation_chain="SA",
)
return gcs_hook.list(
bucket_name=os.getenv(GCS_BUCKET),
prefix=f"{os.getenv(GCS_REFIX, GCS_PATH)}/",
match_glob="**/*.json",
)
@task
def extract_dag_config...
@task
def trigger_dag_run...
@task
def delete_dag_config...
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.6.2 apache-airflow-providers-common-io==1.3.1 apache-airflow-providers-common-sql==1.12.0 apache-airflow-providers-datadog==3.5.1 apache-airflow-providers-fab==1.0.4 apache-airflow-providers-ftp==3.8.0 apache-airflow-providers-google==10.17.0 apache-airflow-providers-http==4.10.1 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-postgres==5.10.2 apache-airflow-providers-slack==8.6.2 apache-airflow-providers-smtp==1.6.1 apache-airflow-providers-sqlite==3.7.1
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
The Postgres instance started reporting many could not obtain lock on row in relation "dag_run" errors:
This pretty fine, because Airflow tried to obtain row level lock by utilise statement SELECT .. FOR UPDATE SKIP LOCKED
Correction: SELECT .. FOR UPDATE NOWAIT raise this error
But it seems what has been added in https://github.com/apache/airflow/pull/38914. I'm not sure that ignoring this error can be considered a solution. If it always results in an ERROR when trying to obtain a lock, why do we even need to attempt the query?
if it always results in an ERROR
Are you sure that it is always return an error?
I'm not sure. But then should be kind of retry mechanism(I'm also not sure if we have it). Anyway I just see a lot of errors in DB and my first guess: something is not correct with transaction level.
This one use in mini scheduler mechanism, which might fail and it is fine and by design, this just an optimisation mechanism. It might also tried to work concurrently, so the idea (as I could understand) it try to obtain row lock level lock and if it failed (EAFP) than it fine because other mini-scheduler already care about it.
As measure might be use skip_locked instead of no_wait both of them non blocking, with differences how it handled internally in DB backend:
- SKIP LOCKED: return non locked records
- NO WAIT: raise an error
As measure might be use
skip_lockedinstead ofno_waitboth of them non blocking, with differences how it handled internally in DB backend:
- SKIP LOCKED: return non locked records
- NO WAIT: raise an error
yeah, would be great to avoid DB internal error
yeah, would be great to avoid DB internal error
Feel free to fix it. Apache Airflow it is open source project, and every one could contribute changes (fixes/features) back, especially if they know what then outcome achievement/benefits of the changes.
So I would recommend try to patch this part into your side and check is it still work as expected, and without additional error logs in DB backend and without side effects.
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f154461a77..a08b9cd94a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3454,8 +3454,12 @@ class TaskInstance(Base, LoggingMixin):
run_id=ti.run_id,
),
session=session,
- nowait=True,
- ).one()
+ skip_locked=True,
+ ).one_or_none()
+ if not dag_run:
+ # Need to log something?
+ session.rollback()
+ return
task = ti.task
if TYPE_CHECKING:
thanks @Taragolis, I will try with my first PR)
@VladimirYushkevich thanks for the PR, I am seeing the same errors since updating to 2.9.1 when running mapped tasks on a LocalExecutor.
I tested the image from PR on our environment and this error disappeared.
@Taragolis, do I need to ask for the review explicitly? I believe the PR is ready from my side, but maybe I'm missing something.
I've added couple additional reviewers which might more familiar with the mini scheduler. But it might take a time - all review happen by people on their free time, and some reviews require a bit more time that others.
You could also ask a round in #new-contributors or #contributors (pick one channel) in Apache Airflow Community Slack Workspace
Does https://github.com/apache/airflow/pull/39745 solves this issue or are there additional tasks?
Does https://github.com/apache/airflow/pull/39745 solves this issue or are there additional tasks?
It does, we can close it.
related
https://github.com/apache/airflow/pull/38914 https://github.com/apache/airflow/pull/39745