airflow
airflow copied to clipboard
Dynamically mapped sensor with mode='reschedule' fails with violated foreign key constraint
Apache Airflow version
2.3.3 (latest released)
What happened
If you are using Dynamic Task Mapping to map a Sensor with .partial(mode='reschedule')
, and if that sensor fails its poke condition even once, the whole sensor task will immediately die with an error like:
[2022-07-14, 10:45:05 CDT] {standard_task_runner.py:92} ERROR - Failed to execute job 19 for task check_reschedule ((sqlite3.IntegrityError) FOREIGN KEY constraint failed
[SQL: INSERT INTO task_reschedule (task_id, dag_id, run_id, map_index, try_number, start_date, end_date, duration, reschedule_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('check_reschedule', 'test_dag', 'manual__2022-07-14T20:44:02.708517+00:00', -1, 1, '2022-07-14 20:45:05.874988', '2022-07-14 20:45:05.900895', 0.025907, '2022-07-14 20:45:10.898820')]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 2973372)
A similar error arises when using a Postgres backend:
[2022-07-14, 11:09:22 CDT] {standard_task_runner.py:92} ERROR - Failed to execute job 17 for task check_reschedule ((psycopg2.errors.ForeignKeyViolation) insert or update on table "task_reschedule" violates foreign key constraint "task_reschedule_ti_fkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_dag, check_reschedule, manual__2022-07-14T21:08:13.462782+00:00, -1) is not present in table "task_instance".
[SQL: INSERT INTO task_reschedule (task_id, dag_id, run_id, map_index, try_number, start_date, end_date, duration, reschedule_date) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(start_date)s, %(end_date)s, %(duration)s, %(reschedule_date)s) RETURNING task_reschedule.id]
[parameters: {'task_id': 'check_reschedule', 'dag_id': 'test_dag', 'run_id': 'manual__2022-07-14T21:08:13.462782+00:00', 'map_index': -1, 'try_number': 1, 'start_date': datetime.datetime(2022, 7, 14, 21, 9, 22, 417922, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2022, 7, 14, 21, 9, 22, 464495, tzinfo=Timezone('UTC')), 'duration': 0.046573, 'reschedule_date': datetime.datetime(2022, 7, 14, 21, 9, 27, 458623, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 2983150)
mode='poke'
seems to behave as expected. As far as I can tell, this affects all Sensor types.
What you think should happen instead
This combination of features should work without error.
How to reproduce
#!/usr/bin/env python3
import datetime
from airflow.decorators import dag, task
from airflow.sensors.bash import BashSensor
@dag(
start_date=datetime.datetime(2022, 7, 14),
schedule_interval=None,
)
def test_dag():
@task
def get_tasks():
return ['(($RANDOM % 2 == 0))'] * 10
tasks = get_tasks()
BashSensor.partial(task_id='check_poke', mode='poke', poke_interval=5).expand(bash_command=tasks)
BashSensor.partial(task_id='check_reschedule', mode='reschedule', poke_interval=5).expand(bash_command=tasks)
dag = test_dag()
if __name__ == '__main__':
dag.cli()
Operating System
CentOS Stream 8
Versions of Apache Airflow Providers
N/A
Deployment
Other
Deployment details
Standalone
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Another case for map_index failure for dynamic task mapping. Not the same as https://github.com/apache/airflow/issues/25060 but likely somewhat related. @uranusjr @ashb