airflow icon indicating copy to clipboard operation
airflow copied to clipboard

on_failure_callback is not called when task is terminated externally

Open hliu47 opened this issue 2 years ago • 6 comments

Apache Airflow version

2.2.5

What happened

on_failure_callback is not called when task is terminated externally. A similar issue was reported in #14422 and fixed in #15172. However, the code that fixed this was changed in a later PR #16301, after which task_instance._run_finished_callback is no longer called when SIGTERM is received (https://github.com/apache/airflow/pull/16301/files#diff-d80fa918cc75c4d6aa582d5e29eeb812ba21371d6977fde45a4749668b79a515L85).

What you think should happen instead

on_failure_callback should be called when task fails regardless of how the task fails.

How to reproduce

DAG file:

import datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
 
 
DEFAULT_ARGS = {
  'email': ['[email protected]']
}
 
 
TZ = pendulum.timezone("America/Los_Angeles")
 
test_dag = DAG(
  dag_id='test_callback_in_manually_terminated_dag',
  schedule_interval='*/10 * * * *',
  default_args=DEFAULT_ARGS,
  catchup=False,
  start_date=datetime.datetime(2022, 7, 14, 0, 0, tzinfo=TZ)
)
 
with test_dag:
  BashOperator(
    task_id='manually_terminated_task',
    bash_command='echo start; sleep 60',
    on_failure_callback=lambda context: print('This on_failure_back should be called when task fails.')
  )

While the task instance is running, either force quitting the scheduler or manually updating its state to None in the database will cause the task to get SIGTERM and terminate. In either case, a failure callback will not be called which does not match the behavior of previous versions of Airflow.

The stack trace is attached below and on_failure_callback is not called.

[2022-07-15, 02:02:24 UTC] {process_utils.py:124} INFO - Sending Signals.SIGTERM to group 10571. PIDs of all processes in the group: [10573, 10575, 10571]
[2022-07-15, 02:02:24 UTC] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 10571
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1431} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-07-15, 02:02:24 UTC] {subprocess.py:99} INFO - Sending SIGTERM signal to process group
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10575, status='terminated', started='02:02:11') (10575) terminated with exit code None
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/operators/bash.py", line 182, in execute
    cwd=self.cwd,
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/hooks/subprocess.py", line 87, in run_command
    for raw_line in iter(self.sub_process.stdout.readline, b''):
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1433, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1289} INFO - Marking task as FAILED. dag_id=test_callback_in_manually_terminated_dag, task_id=manually_terminated_task, execution_date=20220715T015100, start_date=20220715T020211, end_date=20220715T020224
[2022-07-15, 02:02:24 UTC] {logging_mixin.py:109} WARNING - /opt/python3.7/lib/python3.7/site-packages/airflow/utils/email.py:108 PendingDeprecationWarning: Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead.
[2022-07-15, 02:02:24 UTC] {configuration.py:381} WARNING - section/key [smtp/smtp_user] not found in config
[2022-07-15, 02:02:24 UTC] {email.py:214} INFO - Email alerting: attempt 1
[2022-07-15, 02:02:24 UTC] {configuration.py:381} WARNING - section/key [smtp/smtp_user] not found in config
[2022-07-15, 02:02:24 UTC] {email.py:214} INFO - Email alerting: attempt 1
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1827} ERROR - Failed to send email to: ['[email protected]']
...
OSError: [Errno 101] Network is unreachable
[2022-07-15, 02:02:24 UTC] {standard_task_runner.py:98} ERROR - Failed to execute job 159 for task manually_terminated_task (Task received SIGTERM signal; 10571)
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10571, status='terminated', exitcode=1, started='02:02:11') (10571) terminated with exit code 1
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10573, status='terminated', started='02:02:11') (10573) terminated with exit code None

Operating System

CentOS Linux 7

Deployment

Other Docker-based deployment

Anything else

This is an issue in 2.2.5. However, I notice that it appears to be fixed in the main branch by PR #21877 although it was not intended to fix this issue. Is there a timeline for getting that PR into a release? We are happy to test it out to see if it fixes the issue once it's released.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

hliu47 avatar Jul 26 '22 04:07 hliu47

Thanks for opening your first issue here! Be sure to follow the issue template!

boring-cyborg[bot] avatar Jul 26 '22 04:07 boring-cyborg[bot]

Since you know what caused it maybe you would like to provide a fix for it @hliu47 ? Sounds like nice contribution back for the free project and you cn ebecome one of the > 2100 contributors to it.

potiuk avatar Jul 28 '22 13:07 potiuk

Since you know what caused it maybe you would like to provide a fix for it @hliu47 ? Sounds like nice contribution back for the free project and you cn ebecome one of the > 2100 contributors to it.

@potiuk Thank you so much for replying in this issue. I think there is already a PR in the main branch https://github.com/apache/airflow/pull/21877 that fixed the issue. So is there a timeline for getting that PR into a release?

hliu47 avatar Jul 29 '22 20:07 hliu47

Interesting - that should get to 2.4.0 release, initial goal is to get it out mid-August, but we releas when things are ready so it depends on testing phase.

potiuk avatar Jul 29 '22 20:07 potiuk

That's good to know, thanks!

hliu47 avatar Aug 01 '22 01:08 hliu47

@hliu47 can you confirm if this is fixed in 2.4.0

ephraimbuddy avatar Sep 21 '22 18:09 ephraimbuddy

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Dec 25 '22 00:12 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Jan 01 '23 00:01 github-actions[bot]

@potiuk Hello, do you know if this has been resolved or not, there was some indication that it was destined for 2.4 but then milestone has increased to 2.5.1 and then the issue closed by github-actions. Many thanks

ed-sparkes avatar Jan 09 '23 09:01 ed-sparkes

As anyone else here @ed-sparkes I can only see the same sources as you. If it has been closed without being explicitly mentioned, the jury is not out. It may or may not have been solved. But if you would like to verify and reproduce it and see if it has been solved - feel free and then either comment here with evidences it has not been solved (or better open a new issue with reproducible details).

What is the reason you want to know if it's been fixed? Do you base some business decisions on ? Is it causing some issues for you? Or is it just curiosity @ed-sparkes ?

From what I see we believed it's been fixed but the original author did not confirm it, so closing it was pretty good idea. Now - if you would like to help and get more certainty - you could take over where the author left it and perform the verification - that would be great contribution and giving back for the free software you get :)

potiuk avatar Jan 09 '23 17:01 potiuk

This bug is still present on Airflow 2.5.1 by my experience. on_failure_callback works when the task fails "normally", but not when I mark it as failed manually.

This issue should probably be reopened?

seub avatar Jan 30 '23 22:01 seub

I think if If you can provide reproducible case with logs @seub from the last Airflow version, that would be good new issue. I guess the case is different than explained originally in the issue and having a reproducible case with logs would be helpful

potiuk avatar Feb 19 '23 23:02 potiuk

@potiuk It's the same issue, and it's easy to reproduce: just interrupt a task manually (in the UI) and observe that on_failure_callback is not called. Please try and let me know if it's working for you or not.

seub avatar Feb 21 '23 03:02 seub

@seub - please provide a reproducible case and logs if you would like to report it. It might be something specific for your case, and to be brutally honest it's much easier for a reporter of one issue to provide the evidence rather than maintainers who look at 100s of issues and PRs a day to try to reproduce all issues that someone claims is happening without showing their evidences.

Please be empathetic towards maintainers who have a lot of things to handle. If you report "it does not work for me" for an issue that is already closed, you will nearly universaly be asked to report the issue and provide new evidences in a new issue.

potiuk avatar Feb 21 '23 06:02 potiuk

@potiuk This issue was automatically closed by a bot without being solved. I am confirming that it is in fact not solved as of 2.5.1, having the exact same behavior. I'm only reporting it out of courtesy, it doesn't matter to me.

seub avatar Feb 21 '23 14:02 seub

It was closed, because we did not have enough evidences and could not reproduce it. And it's really strange to be sure that it is the same bug. It might look and behave similarly and might be related to your particular deployment - Airflow has about 8 executors, and can run in multiple deployments. What you observe in your deployment might have completely different root cause and can be triggered by a number of factors. You are asking others to guess the environment you have and guess how your DAG look like and how to trigger it.

You forget the point that people here (including maitnainers) try to help and investigate user's issues in their free time usually and you got the software absolutely for free. So what we are asking for, is to spend a little time to help those people to not to spend hours and hours of their personal time to investigate and guess something that you are able to reproduce locally and provide necessary information to cut that time by many hours of trying to guess your setting.

We are just asking for your (little time) to fill in the information so that you could help ot save hours of time of those people who spend their own personal time to try to help users like you. But, maybe it is too much of an ask, I don't know. Hard to say, for sure I would be sensitive and empathetic towards those other humans if I were asked to do it politely. But not everyone is.

potiuk avatar Feb 21 '23 14:02 potiuk

We can solve this by running the callback in the signal handler:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c74ff8b6b0..0c361d0d41 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1528,7 +1528,8 @@ class TaskInstance(Base, LoggingMixin):
                 return
             self.log.error("Received SIGTERM. Terminating subprocesses.")
             self.task.on_kill()
-            raise AirflowException("Task received SIGTERM signal")
+            if self.task.on_failure_callback:
+                self._run_finished_callback(self.task.on_failure_callback, context, "on_failure")
 
         signal.signal(signal.SIGTERM, signal_handler)

The raised exception is somehow lost and I think the loss is similar to the explanations in this blog: https://anonbadger.wordpress.com/2018/12/15/python-signal-handlers-and-exceptions/

ephraimbuddy avatar Feb 22 '23 10:02 ephraimbuddy

Generally running any serious amount of code inside a signal handler should be avoided -- it can cause lots of odd/hard to diagnose behavoiur. (The fact that we already call task.on_kill() from in there is probably not good either.) so it might fix it, but I'm worried about unpredictable side-effects with bigger failure callbacks.

The way around that in general terms (not sure it applies here) is to have the signal handler set a flag (self.terminate_requested=True for example) and then have the "loop" (where ever it is) notice that and run the teardown code.

However I'm not sure if that approach applies here. I'm also not sure which process we are in here, is this the raw/actual task executiuon process, or the supervisor?

ashb avatar Feb 22 '23 10:02 ashb

However I'm not sure if that approach applies here. I'm also not sure which process we are in here, is this the raw/actual task executiuon process, or the supervisor?

It's the raw task execution process.

The way around that in general terms (not sure it applies here) is to have the signal handler set a flag (self.terminate_requested=True for example) and then have the "loop" (where ever it is) notice that and run the teardown code.

I will see if there's something I can do here

ephraimbuddy avatar Feb 22 '23 11:02 ephraimbuddy

Is an option might be to throw a (new/custom) exception in the signal handler, and then catch it in BaseOperator.execute and run the on_failure callback there?

ashb avatar Feb 24 '23 10:02 ashb

Is an option might be to throw a (new/custom) exception in the signal handler, and then catch it in BaseOperator.execute and run the on_failure callback there?

It currently raises AirflowException which was supposed to be caught by _run_raw_task method but it doesn't catch it.

ephraimbuddy avatar Feb 24 '23 11:02 ephraimbuddy

I'm getting the same error on Airflow 2.7.1. I saw that the code change to call the on_failure_callback in this cases is no longer present on version 2.7.1.

As you can see from the screenshots bellow the function was never called, the logs are never printed. Can anyone help me with this? Should I open a new issue? image image image image

FFCMSouza avatar Apr 11 '24 13:04 FFCMSouza