airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Many tasks updating dataset at once causes some of them to fail

Open MatrixManAtYrService opened this issue 3 years ago • 3 comments

Apache Airflow version

main (development)

What happened

I have 16 dags which all update the same dataset. They're set to finish at the same time (when the seconds on the clock are 00). About three quarters of them behave as expected, but the other quarter fails with errors like:

[2022-07-21, 06:06:00 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 8 for task increment_source ((psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dataset_dag_run_queue_pkey"
DETAIL:  Key (dataset_id, target_dag_id)=(1, simple_dataset_sink) already exists.

[SQL: INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id, created_at) VALUES (%(dataset_id)s, %(target_dag_id)s, %(created_at)s)]
[parameters: {'dataset_id': 1, 'target_dag_id': 'simple_dataset_sink', 'created_at': datetime.datetime(2022, 7, 21, 6, 6, 0, 131730, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 375)

I've prepaired a gist with the details: https://gist.github.com/MatrixManAtYrService/b5e58be0949eab9180608d0760288d4d

What you think should happen instead

All dags should succeed

How to reproduce

See this gist: https://gist.github.com/MatrixManAtYrService/b5e58be0949eab9180608d0760288d4d

Summary: Unpause all of the dags which we expect to collide, wait two minutes. Some will have collided.

Operating System

docker/debian

Versions of Apache Airflow Providers

n/a

Deployment

Astronomer

Deployment details

astro dev start targeting commit: cff7d9194f549d801947f47dfce4b5d6870bfaaa

be sure to have pause in requirements.txt

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

MatrixManAtYrService avatar Jul 21 '22 06:07 MatrixManAtYrService

This is described here: https://github.com/apache/airflow/issues/24974

Maybe it's ok that the update failed--given that the downstream dag ran as desired--but I don't think it should cause the task to fail.

MatrixManAtYrService avatar Jul 21 '22 06:07 MatrixManAtYrService

nice work!

dstandish avatar Jul 22 '22 16:07 dstandish

Ran across this again (at least I think it's the same thing) with this dag:

from airflow import Dataset, DAG
from airflow.operators.python import PythonOperator
from datetime import datetime


fan_out = Dataset("fan_out")
fan_in = Dataset("fan_in")

# the leader
with DAG(
    dag_id="momma_duck", start_date=datetime(1970, 1, 1), schedule_interval=None
) as leader:

    PythonOperator(
        task_id="has_outlet", python_callable=lambda: None, outlets=[fan_out]
    )

# the many
for i in range(1, 1000):
    with DAG(
        dag_id=f"duckling_{i}", start_date=datetime(1970, 1, 1), schedule_on=[fan_out]
    ) as duck:

        PythonOperator(
            task_id="has_outlet", python_callable=lambda: None, outlets=[fan_in]
        )
    globals()[f"duck_{i}"] = duck


# the straggler
with DAG(
    dag_id="straggler_duck", start_date=datetime(1970, 1, 1), schedule_on=[fan_in]
) as straggler:

    PythonOperator(task_id="has_outlet", python_callable=lambda: None)

This time I noticed that, with the LocalExecutor at least, there are messages regarding zombines in the scheduler log:

{dagrun.py:567} INFO - Marking run <DagRun duckling_855 @ 2022-08-08 21:01:33.571168+00:00: dataset_triggered__2022-08-08T21:01:33.568548+00:00, state:running, queued_at: 2022-08-08 21:01:33.569170+00:00. externally triggered: False> successful
{dagrun.py:612} INFO - DagRun Finished: dag_id=duckling_855, execution_date=2022-08-08 21:01:33.571168+00:00, run_id=dataset_triggered__2022-08-08T21:01:33.568548+00:00, run_start_date=2022-08-08 21:03:40.332173+00:00, run_end_date=2022-08-08 21:07:27.579980+00:00, run_duration=227.247807, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=None, data_interval_end=None, dag_hash=ff772192a0a9c22e28dbebf2b14f1ebc
{dag.py:3178} INFO - Setting next_dagrun for duckling_855 to None, run_after=None
{scheduler_job.py:1364} WARNING - Failing (58) jobs without heartbeat after 2022-08-08 21:02:27.737896+00:00
{scheduler_job.py:1372} ERROR - Detected zombie job: {'full_filepath': '/home/matt/2022/08/07/dags/many_to_one.py', 'msg': 'Detected <TaskInstance: duckling_289.has_outlet dataset_triggered__2022-08-08T21:01:31.846596+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f98ebffc850>, 'is_failure_callback': True}
{scheduler_job.py:1372} ERROR - Detected zombie job: {'full_filepath': '/home/matt/2022/08/07/dags/many_to_one.py', 'msg': 'Detected <TaskInstance: duckling_127.has_outlet dataset_triggered__2022-08-08T21:01:31.823203+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f98ec93f7f0>, 'is_failure_callback': True}

MatrixManAtYrService avatar Aug 08 '22 21:08 MatrixManAtYrService