Deferrable sensors can timeout with no retries
Alternative to https://github.com/apache/airflow/pull/32990
resolves https://github.com/apache/airflow/issues/32638
This is a less invasive approach. Essentially, what we do here is, update BaseOperator.resume_execution so that when trigger times out then it raises special exception AirflowDeferralTimeout.
Then, BaseSensorOperator.resume_execution, we reraise AirflowDeferralTimeout as a AirflowSensorTimeout.
So, if a sensor resumes from a timed-out deferral, then it's interpreted as a sensor timeout.
All that is required is for a sensor to add a timeout to the deferral.
Example logs:
...
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1357} INFO - Resuming after deferral
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1380} INFO - Executing <Task(TimeDeltaSensorAsync): delta_sensor> on 2023-08-25 07:00:32.864128+00:00
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:57} INFO - Started process 50543 to run task
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'simple2', 'delta_sensor', 'manual__2023-08-25T07:00:32.864128+00:00', '--job-id', '129', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/async_timeout.py', '--cfg-path', '/var/folders/9c/tknx7xx10qx92983y1r5djb40000gn/T/tmp72slvjz_']
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:85} INFO - Job 129: Subtask delta_sensor
[2023-08-25, 07:00:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: simple2.delta_sensor manual__2023-08-25T07:00:32.864128+00:00 [running]> on host daniels-mbp.lan
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1933} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 288, in resume_execution
return super().resume_execution(next_method, next_kwargs, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/dstandish/code/airflow/airflow/models/baseoperator.py", line 1605, in resume_execution
raise AirflowDeferralTimeout(error)
airflow.exceptions.AirflowDeferralTimeout: Trigger timeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 290, in resume_execution
raise AirflowSensorTimeout(*e.args) from e
airflow.exceptions.AirflowSensorTimeout: Trigger timeout
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1398} INFO - Immediate failure requested. Marking task as FAILED. dag_id=simple2, task_id=delta_sensor, execution_date=20230825T070032, start_date=20230825T070034, end_date=20230825T070054
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 129 for task delta_sensor (Trigger timeout; 50543)
[2023-08-25, 07:00:54 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-08-25, 07:00:54 UTC] {taskinstance.py:2774} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:611} ERROR - Trigger cancelled due to timeout
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:612} ERROR - Trigger cancelled; message=
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !
@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !
Hi @robg-eb, thanks for your interest. Yes, your feedback / review / testing would be welcome. I just rebased it.
@dstandish - To clarify, is this PR ready to test as-is?
@dstandish - To clarify, is this PR ready to test as-is?
Right.
In short what this does is, now when trigger times out we raise TaskDeferralTimeout instead of the generic TaskDeferralError. And in BaseSensorOperator, we reraise this as AirflowSensorTimeout which has special meaning (results in immediately fail and no more retries. So if you inherit from BaseSensorOperator this should just work. And if you ant the trigger timeout, after multiple retries etc, to be calculated from the very first try, it's your responsibility to calculate that when deferring as shown in example by @hussein-awala here.
At least that's my understanding after dusting this off just now :)
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
We want to keep this PR active - I can test this week as needed!
I just raised this issue but, it looks like it is covered by this PR.
@dstandish and team - Thank you for creating this PR, as we are currently unable to properly use retries with Deferrable (Async) sensors, because of the inconsistency in retry / timeout behavior between sync and async sensors — Currently, a sync sensor will only apply retries up to the timeout , but an async sensor will apply retries * timeout total amount of waiting time.
We tested this PR now and found that it DOES properly begin to resolve this difference - In the case where no interruption happens, and no retry is needed, then the async sensors on this PR do only run until the timeout interval, and when the timeout period is hit, they abort immediately without retries, just like sync sensors do. This is great.
However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.
To take an example - If we have an async sensor with a 12 hour timeout , and 5 hours into its run, it is interrupted (let’s say, for example, there was a network or metadata DB issue). At this point, when it goes into retry, it then resets the timeout, and starts waiting another 12 hours, meaning it would wait a total of 17 hours. This is not consistent with how sync sensors work, and how they are documented here - https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts
If the sensor fails due to other reasons such as network outages during the 3600 seconds interval, it can retry up to 2 times as defined by retries. Retrying does not reset the timeout. It will still have up to 3600 seconds in total for it to succeed.
So, if we want async sensors to behave the same as sync sensors (and in line with this documentation about sync sensors), we still need to resolve this inconsistency. Thoughts on that?
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
@dstandish - I'd hate to see this PR get closed as we seemed to be on the right track here , and the issue still exists around this inconsistent behavior with deferrable sensors. Any chance you'll get to take another look?
@robg-eb were you able to test this locally?
@dstandish I had tested this out on Jan 29 2024 and left the feedback in the comment above - https://github.com/apache/airflow/pull/33718#issuecomment-1914788890. I see there's now a new commit as of 11 minutes ago - Does that commit address the issues from my comment above, so that I should test again now?
@dstandish I had tested this out on Jan 29 2024 and left the feedback in the comment above - #33718 (comment). I see there's now a new commit as of 11 minutes ago - Does that commit address the issues from my comment above, so that I should test again now?
OK. Good to know. Thank you. I will review your comment soon. There’s no new changes. I just did a rebase.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
This is still an open issue that needs to be addressed for consistency between deferrable and non-deferrable sensors.
Ping @dstandish
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
We need to keep this active and gain consistency.
We need to keep this active and gain consistency.
Ok @nathadfield I just rebased. I pinged @hussein-awala to pick the comment thread back up. If you could review the open conversations and weigh in as appropriate that could be helpful.
However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.
@robg-eb i am focused on airflow 3 things right now but if you want to pick this up and resolve this go for it. you could either suggest code change in comment, or make a PR against my branch.
@robg-eb looked at your comment
However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.
I don't understand. What do you mean "an interruption does happen"?
Is this one ready to review and we think we can merge it in?
Also I think it relates with our discussions on making deferrable the "default" for Airflow 3 and it is part of the discussion "Do we actually pay enough attention for deferrable timeouts and faiilure scenarios to make it "first-class" replacement for other types of sensors - so maybe we should prioritize this one.
I saw a number of people commenting before, but I am not sure what the status is after so many back/forth and long conversations. .. So maybe we can somewhat restart this one and get more people who commented before and understand more in detail about the problems?
cc: @eladkal
Pinging those who were active here
- @uranusjr
- @robg-eb
- @hussein-awala
- @kaxil
Also @thesuperzapper -> re: https://github.com/apache/airflow/issues/36090#issuecomment-2368810776 which seems very much relevant to this one, as I think we need to agree on strategy of how to treat "exceptional" cases for deferrable operators in a consistent way.
I personally know too little in this area to make a meaningful (and correct) feedback, But I have a feeling this and #36090 would need to be addressed if we want to seriously continue discussion started here https://lists.apache.org/thread/3m7vjwcbvodnhrklo69s3j8s8pp7nm6o