airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Deferrable sensors can timeout with no retries

Open dstandish opened this issue 2 years ago • 15 comments

Alternative to https://github.com/apache/airflow/pull/32990

resolves https://github.com/apache/airflow/issues/32638

This is a less invasive approach. Essentially, what we do here is, update BaseOperator.resume_execution so that when trigger times out then it raises special exception AirflowDeferralTimeout.

Then, BaseSensorOperator.resume_execution, we reraise AirflowDeferralTimeout as a AirflowSensorTimeout.

So, if a sensor resumes from a timed-out deferral, then it's interpreted as a sensor timeout.

All that is required is for a sensor to add a timeout to the deferral.

Example logs:

...
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1357} INFO - Resuming after deferral
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1380} INFO - Executing <Task(TimeDeltaSensorAsync): delta_sensor> on 2023-08-25 07:00:32.864128+00:00
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:57} INFO - Started process 50543 to run task
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'simple2', 'delta_sensor', 'manual__2023-08-25T07:00:32.864128+00:00', '--job-id', '129', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/async_timeout.py', '--cfg-path', '/var/folders/9c/tknx7xx10qx92983y1r5djb40000gn/T/tmp72slvjz_']
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:85} INFO - Job 129: Subtask delta_sensor
[2023-08-25, 07:00:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: simple2.delta_sensor manual__2023-08-25T07:00:32.864128+00:00 [running]> on host daniels-mbp.lan
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1933} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 288, in resume_execution
    return super().resume_execution(next_method, next_kwargs, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dstandish/code/airflow/airflow/models/baseoperator.py", line 1605, in resume_execution
    raise AirflowDeferralTimeout(error)
airflow.exceptions.AirflowDeferralTimeout: Trigger timeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 290, in resume_execution
    raise AirflowSensorTimeout(*e.args) from e
airflow.exceptions.AirflowSensorTimeout: Trigger timeout
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1398} INFO - Immediate failure requested. Marking task as FAILED. dag_id=simple2, task_id=delta_sensor, execution_date=20230825T070032, start_date=20230825T070034, end_date=20230825T070054
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 129 for task delta_sensor (Trigger timeout; 50543)
[2023-08-25, 07:00:54 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-08-25, 07:00:54 UTC] {taskinstance.py:2774} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:611} ERROR - Trigger cancelled due to timeout
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:612} ERROR - Trigger cancelled; message=

dstandish avatar Aug 25 '23 06:08 dstandish

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Oct 14 '23 00:10 github-actions[bot]

@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !

robg-eb avatar Nov 06 '23 15:11 robg-eb

@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !

Hi @robg-eb, thanks for your interest. Yes, your feedback / review / testing would be welcome. I just rebased it.

dstandish avatar Nov 22 '23 14:11 dstandish

@dstandish - To clarify, is this PR ready to test as-is?

robg-eb avatar Nov 22 '23 14:11 robg-eb

@dstandish - To clarify, is this PR ready to test as-is?

Right.

In short what this does is, now when trigger times out we raise TaskDeferralTimeout instead of the generic TaskDeferralError. And in BaseSensorOperator, we reraise this as AirflowSensorTimeout which has special meaning (results in immediately fail and no more retries. So if you inherit from BaseSensorOperator this should just work. And if you ant the trigger timeout, after multiple retries etc, to be calculated from the very first try, it's your responsibility to calculate that when deferring as shown in example by @hussein-awala here.

At least that's my understanding after dusting this off just now :)

dstandish avatar Nov 22 '23 14:11 dstandish

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 07 '24 00:01 github-actions[bot]

We want to keep this PR active - I can test this week as needed!

robg-eb avatar Jan 08 '24 14:01 robg-eb

I just raised this issue but, it looks like it is covered by this PR.

nathadfield avatar Jan 11 '24 14:01 nathadfield

@dstandish and team - Thank you for creating this PR, as we are currently unable to properly use retries with Deferrable (Async) sensors, because of the inconsistency in retry / timeout behavior between sync and async sensors — Currently, a sync sensor will only apply retries up to the timeout , but an async sensor will apply retries * timeout total amount of waiting time.

We tested this PR now and found that it DOES properly begin to resolve this difference - In the case where no interruption happens, and no retry is needed, then the async sensors on this PR do only run until the timeout interval, and when the timeout period is hit, they abort immediately without retries, just like sync sensors do. This is great.

However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.

To take an example - If we have an async sensor with a 12 hour timeout , and 5 hours into its run, it is interrupted (let’s say, for example, there was a network or metadata DB issue). At this point, when it goes into retry, it then resets the timeout, and starts waiting another 12 hours, meaning it would wait a total of 17 hours. This is not consistent with how sync sensors work, and how they are documented here - https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

If the sensor fails due to other reasons such as network outages during the 3600 seconds interval, it can retry up to 2 times as defined by retries. Retrying does not reset the timeout. It will still have up to 3600 seconds in total for it to succeed.

So, if we want async sensors to behave the same as sync sensors (and in line with this documentation about sync sensors), we still need to resolve this inconsistency. Thoughts on that?

robg-eb avatar Jan 29 '24 14:01 robg-eb

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Mar 18 '24 00:03 github-actions[bot]

@dstandish - I'd hate to see this PR get closed as we seemed to be on the right track here , and the issue still exists around this inconsistent behavior with deferrable sensors. Any chance you'll get to take another look?

robg-eb avatar Mar 18 '24 01:03 robg-eb

@robg-eb were you able to test this locally?

dstandish avatar Mar 26 '24 16:03 dstandish

@dstandish I had tested this out on Jan 29 2024 and left the feedback in the comment above - https://github.com/apache/airflow/pull/33718#issuecomment-1914788890. I see there's now a new commit as of 11 minutes ago - Does that commit address the issues from my comment above, so that I should test again now?

robg-eb avatar Mar 26 '24 16:03 robg-eb

@dstandish I had tested this out on Jan 29 2024 and left the feedback in the comment above - #33718 (comment). I see there's now a new commit as of 11 minutes ago - Does that commit address the issues from my comment above, so that I should test again now?

OK. Good to know. Thank you. I will review your comment soon. There’s no new changes. I just did a rebase.

dstandish avatar Mar 26 '24 17:03 dstandish

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jun 30 '24 00:06 github-actions[bot]

This is still an open issue that needs to be addressed for consistency between deferrable and non-deferrable sensors.

robg-eb avatar Aug 07 '24 08:08 robg-eb

Ping @dstandish

kaxil avatar Aug 07 '24 22:08 kaxil

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Sep 24 '24 00:09 github-actions[bot]

We need to keep this active and gain consistency.

nathadfield avatar Sep 24 '24 07:09 nathadfield

We need to keep this active and gain consistency.

Ok @nathadfield I just rebased. I pinged @hussein-awala to pick the comment thread back up. If you could review the open conversations and weigh in as appropriate that could be helpful.

dstandish avatar Sep 24 '24 15:09 dstandish

However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.

@robg-eb i am focused on airflow 3 things right now but if you want to pick this up and resolve this go for it. you could either suggest code change in comment, or make a PR against my branch.

dstandish avatar Sep 30 '24 16:09 dstandish

@robg-eb looked at your comment

However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.

I don't understand. What do you mean "an interruption does happen"?

dstandish avatar Nov 25 '24 18:11 dstandish

Is this one ready to review and we think we can merge it in?

Also I think it relates with our discussions on making deferrable the "default" for Airflow 3 and it is part of the discussion "Do we actually pay enough attention for deferrable timeouts and faiilure scenarios to make it "first-class" replacement for other types of sensors - so maybe we should prioritize this one.

I saw a number of people commenting before, but I am not sure what the status is after so many back/forth and long conversations. .. So maybe we can somewhat restart this one and get more people who commented before and understand more in detail about the problems?

cc: @eladkal

Pinging those who were active here

  • @uranusjr
  • @robg-eb
  • @hussein-awala
  • @kaxil

Also @thesuperzapper -> re: https://github.com/apache/airflow/issues/36090#issuecomment-2368810776 which seems very much relevant to this one, as I think we need to agree on strategy of how to treat "exceptional" cases for deferrable operators in a consistent way.

I personally know too little in this area to make a meaningful (and correct) feedback, But I have a feeling this and #36090 would need to be addressed if we want to seriously continue discussion started here https://lists.apache.org/thread/3m7vjwcbvodnhrklo69s3j8s8pp7nm6o

potiuk avatar Nov 30 '24 17:11 potiuk