airflow
airflow copied to clipboard
Many tasks updating dataset at once causes some of them to fail
Apache Airflow version
main (development)
What happened
I have 16 dags which all update the same dataset. They're set to finish at the same time (when the seconds on the clock are 00). About three quarters of them behave as expected, but the other quarter fails with errors like:
[2022-07-21, 06:06:00 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 8 for task increment_source ((psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dataset_dag_run_queue_pkey"
DETAIL: Key (dataset_id, target_dag_id)=(1, simple_dataset_sink) already exists.
[SQL: INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id, created_at) VALUES (%(dataset_id)s, %(target_dag_id)s, %(created_at)s)]
[parameters: {'dataset_id': 1, 'target_dag_id': 'simple_dataset_sink', 'created_at': datetime.datetime(2022, 7, 21, 6, 6, 0, 131730, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 375)
I've prepaired a gist with the details: https://gist.github.com/MatrixManAtYrService/b5e58be0949eab9180608d0760288d4d
What you think should happen instead
All dags should succeed
How to reproduce
See this gist: https://gist.github.com/MatrixManAtYrService/b5e58be0949eab9180608d0760288d4d
Summary: Unpause all of the dags which we expect to collide, wait two minutes. Some will have collided.
Operating System
docker/debian
Versions of Apache Airflow Providers
n/a
Deployment
Astronomer
Deployment details
astro dev start targeting commit: cff7d9194f549d801947f47dfce4b5d6870bfaaa
be sure to have pause in requirements.txt
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This is described here: https://github.com/apache/airflow/issues/24974
Maybe it's ok that the update failed--given that the downstream dag ran as desired--but I don't think it should cause the task to fail.
nice work!
Ran across this again (at least I think it's the same thing) with this dag:
from airflow import Dataset, DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
fan_out = Dataset("fan_out")
fan_in = Dataset("fan_in")
# the leader
with DAG(
dag_id="momma_duck", start_date=datetime(1970, 1, 1), schedule_interval=None
) as leader:
PythonOperator(
task_id="has_outlet", python_callable=lambda: None, outlets=[fan_out]
)
# the many
for i in range(1, 1000):
with DAG(
dag_id=f"duckling_{i}", start_date=datetime(1970, 1, 1), schedule_on=[fan_out]
) as duck:
PythonOperator(
task_id="has_outlet", python_callable=lambda: None, outlets=[fan_in]
)
globals()[f"duck_{i}"] = duck
# the straggler
with DAG(
dag_id="straggler_duck", start_date=datetime(1970, 1, 1), schedule_on=[fan_in]
) as straggler:
PythonOperator(task_id="has_outlet", python_callable=lambda: None)
This time I noticed that, with the LocalExecutor at least, there are messages regarding zombines in the scheduler log:
{dagrun.py:567} INFO - Marking run <DagRun duckling_855 @ 2022-08-08 21:01:33.571168+00:00: dataset_triggered__2022-08-08T21:01:33.568548+00:00, state:running, queued_at: 2022-08-08 21:01:33.569170+00:00. externally triggered: False> successful
{dagrun.py:612} INFO - DagRun Finished: dag_id=duckling_855, execution_date=2022-08-08 21:01:33.571168+00:00, run_id=dataset_triggered__2022-08-08T21:01:33.568548+00:00, run_start_date=2022-08-08 21:03:40.332173+00:00, run_end_date=2022-08-08 21:07:27.579980+00:00, run_duration=227.247807, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=None, data_interval_end=None, dag_hash=ff772192a0a9c22e28dbebf2b14f1ebc
{dag.py:3178} INFO - Setting next_dagrun for duckling_855 to None, run_after=None
{scheduler_job.py:1364} WARNING - Failing (58) jobs without heartbeat after 2022-08-08 21:02:27.737896+00:00
{scheduler_job.py:1372} ERROR - Detected zombie job: {'full_filepath': '/home/matt/2022/08/07/dags/many_to_one.py', 'msg': 'Detected <TaskInstance: duckling_289.has_outlet dataset_triggered__2022-08-08T21:01:31.846596+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f98ebffc850>, 'is_failure_callback': True}
{scheduler_job.py:1372} ERROR - Detected zombie job: {'full_filepath': '/home/matt/2022/08/07/dags/many_to_one.py', 'msg': 'Detected <TaskInstance: duckling_127.has_outlet dataset_triggered__2022-08-08T21:01:31.823203+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f98ec93f7f0>, 'is_failure_callback': True}