airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamically mapped sensor with mode='reschedule' fails with violated foreign key constraint

Open Gollum999 opened this issue 2 years ago • 1 comments

Apache Airflow version

2.3.3 (latest released)

What happened

If you are using Dynamic Task Mapping to map a Sensor with .partial(mode='reschedule'), and if that sensor fails its poke condition even once, the whole sensor task will immediately die with an error like:

[2022-07-14, 10:45:05 CDT] {standard_task_runner.py:92} ERROR - Failed to execute job 19 for task check_reschedule ((sqlite3.IntegrityError) FOREIGN KEY constraint failed
[SQL: INSERT INTO task_reschedule (task_id, dag_id, run_id, map_index, try_number, start_date, end_date, duration, reschedule_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('check_reschedule', 'test_dag', 'manual__2022-07-14T20:44:02.708517+00:00', -1, 1, '2022-07-14 20:45:05.874988', '2022-07-14 20:45:05.900895', 0.025907, '2022-07-14 20:45:10.898820')]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 2973372)

A similar error arises when using a Postgres backend:

[2022-07-14, 11:09:22 CDT] {standard_task_runner.py:92} ERROR - Failed to execute job 17 for task check_reschedule ((psycopg2.errors.ForeignKeyViolation) insert or update on table "task_reschedule" violates foreign key constraint "task_reschedule_ti_fkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_dag, check_reschedule, manual__2022-07-14T21:08:13.462782+00:00, -1) is not present in table "task_instance".

[SQL: INSERT INTO task_reschedule (task_id, dag_id, run_id, map_index, try_number, start_date, end_date, duration, reschedule_date) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(start_date)s, %(end_date)s, %(duration)s, %(reschedule_date)s) RETURNING task_reschedule.id]
[parameters: {'task_id': 'check_reschedule', 'dag_id': 'test_dag', 'run_id': 'manual__2022-07-14T21:08:13.462782+00:00', 'map_index': -1, 'try_number': 1, 'start_date': datetime.datetime(2022, 7, 14, 21, 9, 22, 417922, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2022, 7, 14, 21, 9, 22, 464495, tzinfo=Timezone('UTC')), 'duration': 0.046573, 'reschedule_date': datetime.datetime(2022, 7, 14, 21, 9, 27, 458623, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 2983150)

mode='poke' seems to behave as expected. As far as I can tell, this affects all Sensor types.

What you think should happen instead

This combination of features should work without error.

How to reproduce

#!/usr/bin/env python3
import datetime

from airflow.decorators import dag, task
from airflow.sensors.bash import BashSensor


@dag(
    start_date=datetime.datetime(2022, 7, 14),
    schedule_interval=None,
)
def test_dag():
    @task
    def get_tasks():
        return ['(($RANDOM % 2 == 0))'] * 10

    tasks = get_tasks()
    BashSensor.partial(task_id='check_poke',       mode='poke',       poke_interval=5).expand(bash_command=tasks)
    BashSensor.partial(task_id='check_reschedule', mode='reschedule', poke_interval=5).expand(bash_command=tasks)


dag = test_dag()


if __name__ == '__main__':
    dag.cli()

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

N/A

Deployment

Other

Deployment details

Standalone

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Gollum999 avatar Jul 15 '22 13:07 Gollum999

Another case for map_index failure for dynamic task mapping. Not the same as https://github.com/apache/airflow/issues/25060 but likely somewhat related. @uranusjr @ashb

potiuk avatar Jul 16 '22 21:07 potiuk