airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Deferrable operator tasks do not call `on_kill` method when fail or restarted

Open nirben82 opened this issue 1 year ago • 24 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

When marking a task that is using a deferrable operators as failed or cleared, on_kill method is not called and the remote job is never stopped.

Task log output:

*** Found local files:
***   * /usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log
***   * /usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log.trigger.13880.log
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1308} INFO - Starting attempt 30 of 30
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1327} INFO - Executing <Task(BigQueryInsertJobOperator): bigquery_task> on 2023-12-06 13:22:54.698489+00:00
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:57} INFO - Started process 629728 to run task
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'bigquery_task', 'manual__2023-12-06T13:22:54.698489+00:00', '--job-id', '13896', '--raw', '--subdir', 'DAGS_FOLDER/dag_test.py', '--cfg-path', '/tmp/tmpxnqe4ysz']
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:85} INFO - Job 13896: Subtask bigquery_task
[2023-12-06, 14:14:11 UTC] {task_command.py:410} INFO - Running <TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [running]> on host nirben-ws1.tlv.corp.google.com
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='some-owner' AIRFLOW_CTX_DAG_ID='dag_test' AIRFLOW_CTX_TASK_ID='bigquery_task' AIRFLOW_CTX_EXECUTION_DATE='2023-12-06T13:22:54.698489+00:00' AIRFLOW_CTX_TRY_NUMBER='30' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-06T13:22:54.698489+00:00'
[2023-12-06, 14:14:11 UTC] {base.py:73} INFO - Using connection ID 'some-connection' for task execution.
[2023-12-06, 14:14:11 UTC] {bigquery.py:2799} INFO - Executing: {'query': {'query': 'SELECT * FROM table, 'useLegacySql': False, 'priority': 'batch', 'allowLargeResults': True, 'destinationTable': {'datasetId': 'datasetId', 'projectId': 'projectId', 'tableId': 'some_table_test'}, 'flattenResults': False, 'writeDisposition': 'WRITE_TRUNCATE', 'createDisposition': 'CREATE_IF_NEEDED'}}'
[2023-12-06, 14:14:11 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-12-06, 14:14:11 UTC] {logging_mixin.py:150} WARNING - /usr/local/google/home/nirben/venv/waze-data/lib/python3.8/site-packages/google/auth/_default.py:78 UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.
[2023-12-06, 14:14:12 UTC] {bigquery.py:1596} INFO - Inserting job airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_5bf2e5098a664fd1d54ec9b9b75d077b
[2023-12-06, 14:14:13 UTC] {bigquery.py:51} INFO - Using the connection  some-connection .
[2023-12-06, 14:14:13 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=dag_test, task_id=bigquery_task, execution_date=20231206T132254, start_date=20231206T141411
[2023-12-06, 14:14:13 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_pipelines' for task execution.
[2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger cancelled; message=```

### What you think should happen instead

I think that the trigger should be stopped and task instance should continue the same [behavior](https://github.com/apache/airflow/blob/d2514b408cb98f792289a5d032aaf85fe605350d/airflow/models/taskinstance.py#L2452) that is done by any non-deferrable tasks.

### How to reproduce

1. Invoke a run of the below dag and after the task is in state `defer`, mark it as `failed` or clear it. 

    The task log ends with the below text and the job in BQ does not stop.
    ```
    [2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_pipelines' for task execution.
    [2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
    [2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
    [2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger cancelled; message=
    ```

2. Change to `deferrable=False`,  Invoke a run of the dag and after the task is in state `running` and the job started in BQ, mark it as `failed` or clear it. 

    The task log ends with the below text and the job in BQ stops.
    ```
    [2023-12-06, 14:13:06 UTC] {bigquery.py:1596} INFO - Inserting job airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:36 UTC] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
    [2023-12-06, 14:13:36 UTC] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 629540. PIDs of all processes in the group: [629540]
    [2023-12-06, 14:13:36 UTC] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 629540
    [2023-12-06, 14:13:36 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.
    [2023-12-06, 14:13:36 UTC] {bigquery.py:1487} INFO - Attempting to cancel job : project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:37 UTC] {bigquery.py:1508} INFO - Waiting for canceled job project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985 to finish.
    [2023-12-06, 14:13:43 UTC] {bigquery.py:1499} INFO - Job successfully canceled: project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:43 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=629540, status='terminated', exitcode=0, started='16:13:05') (629540) terminated with exit code 0
    ```

from datetime import datetime, timedelta from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

default_args = { 'owner': 'owner', 'start_date': datetime(2023, 12, 5), 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=15) }

dag = DAG('dag_test', default_args=default_args, catchup=False)

query_that_runs_for_a_few_minutes = """ SELECT * FROM some-large-table """

client_driver_events_task = BigQueryInsertJobOperator( task_id='bigquery_task', gcp_conn_id='google_cloud_pipelines', dag=dag, configuration={ 'query': { 'query': query_that_runs_for_a_few_minutes.strip(), 'useLegacySql': False, 'priority': 'batch', "allowLargeResults": True, "destinationTable": { "datasetId": "datasetId", "projectId": "projectId", "tableId": "tableId" }, 'flattenResults': False, 'writeDisposition': 'WRITE_TRUNCATE', 'createDisposition': 'CREATE_IF_NEEDED' } }, deferrable=True)


### Operating System

Debian GNU

### Versions of Apache Airflow Providers

apache-airflow==2.6.2
apache-airflow-providers-google==10.9.0

### Deployment

Official Apache Airflow Helm Chart

### Deployment details

_No response_

### Anything else

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)

nirben82 avatar Dec 06 '23 14:12 nirben82

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Dec 06 '23 14:12 boring-cyborg[bot]

Related : https://github.com/apache/airflow/issues/19929 . It seems this was reported earlier but got closed as stale issue. We would also find this feature useful to cancel remote jobs tracked by triggerer on clear/fail.

tirkarthi avatar Dec 06 '23 17:12 tirkarthi

Yes. Probably it should be changed. The problem is that this would have to bring the task from deferred state first to run it's on_kill method, but that would require someone to implement the logic for that.

potiuk avatar Dec 11 '23 21:12 potiuk

Thanks @potiuk.

  1. Do you know if someone is currently working on this?
  2. Could you please provide some more specifics on what needs to be implemented, so we can understand if our team can prioritize adding the logic?
  3. Is it also possible to update previous minor version 2.6? This is mainly so we can continue being aligned with Cloud Composer that is currently on 2.6.3.

nirben82 avatar Dec 13 '23 14:12 nirben82

1, Not that I am aware of. But you are asking random maintainer who might not know everything. In OSS everything is in the open so if this is not mentioned here, there is no other place where someone working on it would be recorded.

  1. It's really part of the task to design it. This one will likely require Airflow Improvement Proposal or at the very least discussion with the devlist outlining the proposal. Shortly you will need to add a whole new internal processing on how to make sure to instantiate the task in a worker when it is deferred to execute on_kiill in the worker. Currently on_kill is run when task is exiting from running state - so when it is already handled by the worker. Deferred task is handled by Triggerer by runing triggers, not real "task" code - just triggers that task deferred to. So what is needed is to decide who and how will schedule a task that should only execute on_kill method in the worker when the task is set to killed statis and is currently deferred to Trigger. This has not been discussed nor designed yet, so it has to start (and the person/people doing it must be capable of) with making desing and proposla how to implement it, lead the discussion in devlist and get approval on the proposal. So you really need someone not to execute precise instructions but somoene who will be able to propose and lead discussion on the whole design of it.

  2. No. This is against our versioning policies https://airflow.apache.org/docs/apache-airflow/stable/release-process.html - SemVer forbids adding tasks (this is a new feature), Also we only release new things in exactly one currently active branch. As of now we are voting on release candidate for 2.8.0 and the current main is 2.9.0dev0 so the first thing this feature might be available is 2.9.0 - but it woudl reqiure to start preparing the design and making proposal as soon as possible.

potiuk avatar Dec 13 '23 14:12 potiuk

We're working with Slurm deferrable operator and waiting this feature so much 🙏

pustoshilov-d avatar Apr 11 '24 10:04 pustoshilov-d

We're working with Slurm deferrable operator and waiting this feature so much 🙏

Just a kind reminder: if you and your company wait for a feature badly, the most certain way to get it is to spend engineering effort and contribute it back. Other than that the only thing to do is to :pray: that someone will do it. Also paying someone to contribute such feature works.

This is how open-source project works.

potiuk avatar Apr 11 '24 10:04 potiuk

Assign it ot @sunank200 as he already created a PR to avoid others work on it

Lee-W avatar Apr 12 '24 15:04 Lee-W

I guess this issue needs a more general solution across core triggered and the linked PR is more specific to a particular operator. One of the solutions we are trying to implement is to have something like on_cancel executed on on CancellationError but a defined interface to implement like run method for cancellation would help users.

cc: @andrewgodwin

tirkarthi avatar Apr 12 '24 16:04 tirkarthi

I just found out that there is already a cleanup method as part of the interface. The problem is that cleanup is called during triggerer restarts too as part of deployment due to which we don't want to cleanup like deleting remote jobs since the trigger will start tracking invalid jobs, So I thought of this change where there could be a marker attribute set only when trigger is cancelled as part of to_cancel loop and call my custom cleanup function. This helps with executing cleanup when tasks are cleared or marked success/failure. I tried using contextvar but for some reason the update from triggerer_job_runner is not propagated to the trigger. In Python 3.9 the msg parameter was added to Task.cancel with which custom messages can be propagated as part of CancelledError in Python 3.11 and above. This doesn't fully address the issue but thought to add my analysis in case if someone finds the patch/approach useful.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/triggers/base/index.html#airflow.triggers.base.BaseTrigger.cleanup https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

class CustomTrigger(BaseTrigger):

    async def cleanup(self):
        """
        cleanup is called when trigger is cancelled by the triggerer which can happen as part of to_cancel loop
        and also when triggerer exits/restarts but we want to execute cleanup only when it's part of the cancel loop
        since triggerer could be restarted during deployment or marked as unhealthy due to which we don't want to
        do cleanup like deleting jobs tracked upstream which becomes an issue as the triggerer starts to track invalid jobs.

        If the trigger is cancelled from the to_cancel loop then the trigger is not present in the database
        with _cancelled_from_job_runner set as True with which the custom cleanup is executed.
        E.g. cleared task instance where trigger is cancelled
        """
        cancelled_from_runner = getattr(self, "_cancelled_from_job_runner", False)

        if cancelled_from_runner:
            self.custom_cleanup()

        await super().cleanup()

diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index bb151b32cc..e80a910008 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -497,6 +497,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                     "name": f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} "
                     f"(ID {trigger_id})",
                     "events": 0,
+                    "trigger": trigger_instance
                 }
             else:
                 self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
@@ -512,6 +513,11 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             trigger_id = self.to_cancel.popleft()
             if trigger_id in self.triggers:
                 # We only delete if it did not exit already
+                # These are tasks cancelled by triggerer since they are not found in the database
+                # E.g. task instance cleared from UI. _cancelled_from_job_runner is set so that
+                # our cleanup is executed only as needed and not during triggerer process shutdown
+                # when cancel is called to call cleanup but this attribut is not present.
+                self.triggers[trigger_id]["trigger"]._cancelled_from_job_runner = True
                 self.triggers[trigger_id]["task"].cancel()
             await asyncio.sleep(0)

tirkarthi avatar Apr 13 '24 07:04 tirkarthi

If i was feeling super-hacky in my own fork, I'd be tweaking models.Trigger.clean_unused(), and adapting submit_failure to do the hilarious re-schedule. In theory, just those two would work - it's similar theory that the check_trigger_timeouts fn that the scheduler uses to kill the tasks + triggers... Edit: primary goal with this method, being that it doesn't care about the trigger implementation, and mimics an existing/proven use-path - just from the other direction. Please note, this is results of a 30 minute explore, so excuse any massive oversights in the concept ;)

dh-racheldean avatar Apr 15 '24 20:04 dh-racheldean

WARNING

The proposed solution of capturing asyncio.CancelledError in a try/except is NOT safe! The following PRs have implemented this:

  • https://github.com/apache/airflow/pull/38912 (@sunank200)
  • https://github.com/apache/airflow/pull/39373 (@akaul)
  • https://github.com/apache/airflow/pull/39130 (@sunank200)
  • https://github.com/apache/airflow/pull/39230 (@sunank200)

These PRs will result in the external job being canceled if the triggerer itself is restarted (or crashes), not just when users set the state of a deferred task to "success", "failed", or "clear".

Also note, Airflow will be unaware that the external job has been canceled, and will reschedule the deferred operator on another triggerer instance (which could cause all kinds of strange behaviour).

It makes more sense to find a way for Airflow itself to run BaseOperator.on_kill(), even if the operator is deferred while it is killed (either manually, or by failure).

However, as I am sure vendors will want their deferred operators to work correctly (when users set deferred tasks to "clear", "success" or "failed") here is a possible workaround (which needs testing).

Possible Workaround

We can still capture asyncio.CancelledError, but ONLY cancel the external job if the TaskInstance is NOT in a ~~running~~ or deferred state. That is, if airflow still thinks the job is ~~running~~ or deferred, we probably should not kill the external job.

EDIT: the original solution did not handle the case that the task was "cleared" causing the task to be rescheduled, and be deferred again. Fixing this requires us to distinguish if the TaskInstance for the run is the same one that created the deferred operator which is being canceled. Luckily we can do this by comparing the job_id. I have updated the example below with this fix.

Here is a basic triggerer which pretends to run an external job. It shows if it has "canceled" the job, by writing to /tmp/testing/on_kill_deferred/{dag_id}/{task_id}/log_trigger.txt.

import asyncio
import os
from datetime import timedelta
from typing import Any, AsyncIterator, Dict, Optional

import pendulum
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.settings import Session
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
from pendulum.datetime import datetime


# define a trigger that sleeps until a given datetime, and then sends an event
# and "handles" `asyncio.CancelledError` by writing to a file
class DateTimeTriggerWithCancel(BaseTrigger):
    def __init__(
        self,
        dag_id: str,
        task_id: str,
        run_id: str,
        map_index: int,
        job_id: int,
        statement_name: str,
        moment: datetime.datetime,
    ):
        super().__init__()
        self.dag_id = dag_id
        self.task_id = task_id
        self.run_id = run_id
        self.map_index = map_index
        self.job_id = job_id
        self.statement_name = statement_name

        # set and validate the moment
        if not isinstance(moment, datetime.datetime):
            raise TypeError(
                f"Expected 'datetime.datetime' type for moment. Got '{type(moment)}'"
            )
        elif moment.tzinfo is None:
            raise ValueError("You cannot pass naive datetime")
        else:
            self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)

    def serialize(self) -> tuple[str, dict[str, Any]]:
        #
        # TODO: for Airflow 2.6.0+ you can get the `TaskInstance` from `self.task_instance` which removes
        #       the need to store `dag_id`, `task_id`, `run_id`, `map_index`, and `job_id` in the trigger
        #       serialization. However, you STILL NEED TO QUERY for the latest TaskInstance state.
        #
        return (
            "test_on_kill_deferred.DateTimeTriggerWithCancel",
            {
                "dag_id": self.dag_id,
                "task_id": self.task_id,
                "run_id": self.run_id,
                "map_index": self.map_index,
                "job_id": self.job_id,
                "statement_name": self.statement_name,
                "moment": self.moment,
            },
        )

    @provide_session
    def get_task_instance(self, session: Session) -> TaskInstance:
        query = session.query(TaskInstance).filter(
            TaskInstance.dag_id == self.dag_id,
            TaskInstance.task_id == self.task_id,
            TaskInstance.run_id == self.run_id,
            TaskInstance.map_index == self.map_index,
        )
        task_instance = query.one_or_none()
        if task_instance is None:
            raise AirflowException(
                "TaskInstance with dag_id: %s, task_id: %s, run_id: %s and map_index: %s is not found",
                self.dag_id,
                self.task_id,
                self.run_id,
                self.map_index,
            )
        return task_instance

    def safe_to_cancel(self) -> bool:
        """
        Whether it is safe to cancel the external job which is being executed by this trigger.

        This is to avoid the case that `asyncio.CancelledError` is called because the trigger
        itself is stopped. Because in those cases, we should NOT cancel the external job.
        """
        # Database query is needed to get the latest state of the task instance.
        task_instance = self.get_task_instance()

        # If the current job_id is different from when the trigger was created,
        # then we should cancel the external job we are waiting on because the task has been
        # cleared and a new job has been created.
        if int(task_instance.job_id) != int(self.job_id):
            return True

        # If the task is not in a deferred state, then something else has happened to the task
        # since we were deferred (e.g. a manual state change), so we should cancel the external
        # job we are waiting on.
        return task_instance.state != TaskInstanceState.DEFERRED

    async def run(self) -> AsyncIterator[TriggerEvent]:
        self.log.info("trigger starting")
        try:
            # Sleep a second at a time
            while self.moment > pendulum.instance(timezone.utcnow()):
                self.log.info("sleeping 1 second...")
                await asyncio.sleep(1)

            # Send our single event and then we're done
            self.log.info("yielding event with payload %r", self.moment)
            yield TriggerEvent(
                {
                    "statement_name": self.statement_name,
                    "status": "success",
                    "moment": self.moment,
                }
            )

        except asyncio.CancelledError:
            self.log.info("asyncio.CancelledError was called")
            if self.statement_name:
                if self.safe_to_cancel():
                    self.log.warning("Cancelling query: %s", self.statement_name)

                    # Cancel the query (mock by writing to a file)
                    output_folder = (
                        f"/tmp/testing/on_kill_deferred/{self.dag_id}/{self.task_id}"
                    )
                    os.makedirs(output_folder, exist_ok=True)
                    with open(f"{output_folder}/log_trigger.txt", "a") as f:
                        f.write(
                            f"asyncio.CancelledError was called: {self.statement_name}\n"
                        )
                    yield TriggerEvent({"status": "cancelled"})
                else:
                    self.log.warning("Triggerer probably stopped, not cancelling query")
            else:
                self.log.error("self.statement_name is None")
        except Exception as e:
            self.log.exception("Exception occurred while checking for query completion")
            yield TriggerEvent({"status": "error", "message": str(e)})


# an operator that sleeps for a given number of seconds using a deferred trigger
class TestDeferredOperator(BaseOperator):
    statement_name: Optional[str]
    wait_seconds: int
    moment: Optional[datetime.datetime]

    def __init__(self, wait_seconds: int = 120, **kwargs):
        super().__init__(**kwargs)
        self.wait_seconds = wait_seconds
        self.statement_name = None
        self.moment = None

    def execute(self, context: Context) -> None:
        self.statement_name = (
            f"airflow"
            f"::{self.dag.dag_id}"
            f"::{self.task_id}"
            f"::{pendulum.now(timezone.utc).isoformat()}"
        )
        self.moment = pendulum.instance(timezone.utcnow()).add(
            seconds=self.wait_seconds
        )
        self.defer(
            trigger=DateTimeTriggerWithCancel(
                dag_id=self.dag.dag_id,
                task_id=self.task_id,
                run_id=context["run_id"],
                map_index=context["task_instance"].map_index,
                job_id=context["task_instance"].job_id,
                statement_name=self.statement_name,
                moment=self.moment,
            ),
            method_name="execute_complete",
        )

    def execute_complete(
        self,
        context: Context,
        event: Optional[Dict[str, Any]] = None,
    ) -> None:
        if event is None:
            raise AirflowException("Trigger event is None")

        if event["status"] == "error":
            msg = f"context: {context}, error message: {event['message']}"
            raise AirflowException(msg)

        if event["status"] == "cancelled":
            self.log.info(f"external job was cancelled: {self.statement_name}")
            return

        self.log.info("%s completed successfully.", self.task_id)

    def on_kill(self):
        output_folder = (
            f"/tmp/testing/on_kill_deferred/{self.dag.dag_id}/{self.task_id}"
        )
        os.makedirs(output_folder, exist_ok=True)
        with open(f"{output_folder}/log_operator.txt", "a") as f:
            f.write(f"on_kill was called: {self.statement_name}\n")


with DAG(
    dag_id="test_on_kill_deferred",
    schedule_interval="0 0 * * *",
    start_date=days_ago(1),
    dagrun_timeout=timedelta(minutes=60),
) as dag:

    # task 1
    task_1 = TestDeferredOperator(
        task_id="task_1",
        wait_seconds=60,
    )

thesuperzapper avatar May 05 '24 22:05 thesuperzapper

For those watching, I have done a little more testing with the workaround proposed in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855, and at least in my tests, it works correctly when execution_timeout or defer(timeout=xxxx) is reached.

That is, the self.safe_to_cancel() returns true, and the external job is canceled (because the TaskInstance becomes failed status before terminating the deferred operator). But I would be surprised if there was not some race condition involving timeouts, so more testing is needed.

However, my preference is still finding a way to make on_kill run even when the task is deferred, even if only so that we don't have to tell everyone to update their providers!

thesuperzapper avatar May 06 '24 19:05 thesuperzapper

Same here. I have created the PR for BigQuery/InsertJobOperator based on suggestions. Please review : https://github.com/apache/airflow/pull/39442 I have tested it and it works correctly. cc: @thesuperzapper

Currently, I have used the workaround. I will work on the general solution across core triggerer after these PRs fixed.

sunank200 avatar May 06 '24 19:05 sunank200

PR for DataprocCreateClusterOperator: https://github.com/apache/airflow/pull/39446

sunank200 avatar May 06 '24 20:05 sunank200

PR for DataprocSubmitJobOperator: https://github.com/apache/airflow/pull/39447

sunank200 avatar May 06 '24 20:05 sunank200

@thesuperzapper also BaseTrigger already has task_instance which can be directly be used

sunank200 avatar May 07 '24 10:05 sunank200

Hi Everyone,

I noticed that you are making workaround for some of the providers, can someone say if EmrContainerOperator is also going to fixed anytime soon? is there a ticket for that one

rzilkha avatar May 07 '24 11:05 rzilkha

Please re-review: https://github.com/apache/airflow/pull/39442

sunank200 avatar May 07 '24 13:05 sunank200

Please re-review: https://github.com/apache/airflow/pull/39446

sunank200 avatar May 07 '24 16:05 sunank200

Please re-review: https://github.com/apache/airflow/pull/39447

sunank200 avatar May 07 '24 16:05 sunank200

@sunank200 I see you have removed the check for a RUNNING state, and now only check if it's not in a DEFERRED state, for example:

https://github.com/apache/airflow/blob/3938f71dfae21c84a3518625543a28ad02edf641/airflow/providers/google/cloud/triggers/dataproc.py#L152

This seems reasonable, and might actually be safer, because it ensures that if the TaskInstance has started RUNNING again (perhaps try number 2+ of the same task), that external jobs triggered by older tries are killed.

thesuperzapper avatar May 09 '24 04:05 thesuperzapper

@sunank200 also note that using the self.task_instance of the BaseTrigger will require at least Airflow 2.6.0, it was added by this PR https://github.com/apache/airflow/pull/29482.

Upstream this should be fine, as all providers currently pin apache-airflow>=2.6.0:

  • https://github.com/apache/airflow/blob/2.9.1/airflow/providers/databricks/provider.yaml#L64

But for those wanting to backport this fix into Airflow 2.5.3 (and older), they will need to use something like what I have done in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 and add the dag_id, task_id, and run_id to the serialize() method of the triggerer.

thesuperzapper avatar May 09 '24 04:05 thesuperzapper

@sunank200 also we will need to update all the other providers that set an on_kill and can be deferred (unless you are already doing this).

This search might help us find them:

  • https://github.com/search?q=repo:apache/airflow+path:/%5Eairflow%5C/providers%5C//+on_kill(self)&type=code

thesuperzapper avatar May 09 '24 05:05 thesuperzapper

@sunank200 @akaul my original solution proposed in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 had a critical part missing, which means that the solution you implemented (in the BigQuery and DataProc operators) needs to be updated, along with the unmerged update to the Databricks operator (PR https://github.com/apache/airflow/pull/39373).

The problem was that we would not correctly cancel the external job if the task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This is because if the task is cleared, the new job will likely end up DEFERRED before the asyncio.CancelledError is even thrown.

I found a solution, which is to update the safe_to_cancel() method to also return True when the job_id of the current TaskInstance has changed since we were deferred (which only happens when the task is rescheduled because it was cleared).

For example, here is the updated safe_to_cancel() definition I am using:

    def safe_to_cancel(self) -> bool:
        """
        Whether it is safe to cancel the external job which is being executed by this trigger.

        This is to avoid the case that `asyncio.CancelledError` is called because the trigger
        itself is stopped. Because in those cases, we should NOT cancel the external job.
        """
        # Database query is needed to get the latest state of the task instance.
        task_instance = self.get_task_instance()

        # If the current job_id is different from when the trigger was created,
        # then we should cancel the external job we are waiting on because the task has been
        # cleared and a new job has been created.
        if int(task_instance.job_id) != int(self.job_id):
            return True

        # If the task is not in a deferred state, then something else has happened to the task
        # since we were deferred (e.g. a manual state change), so we should cancel the external
        # job we are waiting on.
        return task_instance.state != TaskInstanceState.DEFERRED

So people can do more testing, I have updated my reference example in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855

NOTE: my reference is designed to work on all versions of airflow with deferable operators (e.g. 2.4.0+), but can be simplified if we require 2.6.0+ like you have in the upstream providers, see the "TODO" for more context.


@potiuk we might want to consider implementing my workaround as a default method on the BaseTrigger and documenting it, or just explaining the workaround in the official docs about triggerers, because its pretty critical that users know that on_kill is NOT called by triggers when they are manually set as success/failure/clear, as this will result in external jobs not being stopped when users expect.

We also need to update all other operators that currently define on_kill and support being deferred to use the workaround, here is a search that helps us find them:

  • https://github.com/search?q=repo:apache/airflow+path:/%5Eairflow%5C/providers%5C//+on_kill(self)&type=code

thesuperzapper avatar Jun 13 '24 03:06 thesuperzapper

@sunank200 @akaul my original solution proposed in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 had a critical part missing, which means that the solution you implemented (in the BigQuery and DataProc operators) needs to be updated, along with the unmerged update to the Databricks operator (PR https://github.com/apache/airflow/pull/39373).

The problem was that we would not correctly cancel the external job if the task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This is because if the task is cleared, the new job will likely end up DEFERRED before the asyncio.CancelledError is even thrown.

@thesuperzapper I tried clearing the task for DataprocCreateClusterOperator and it deleted the provisioning cluster for me.

sunank200 avatar Jun 14 '24 07:06 sunank200

@thesuperzapper I tried clearing the task for DataprocCreateClusterOperator and it deleted the provisioning cluster for me.

@sunank200 it's a race condition, it will only fail to cancel the external job if after clearing the task becomes deferred again before the exception handler on the triggerer runs.

It's more important for jobs which are very quick to start (like submitting a database query to the Redshift Data API), but you should definitely fix it in your operator too.

thesuperzapper avatar Jun 14 '24 07:06 thesuperzapper

Probably worth implementing it on the "airflow" level as you mentioned. Ate you willing to submit a PR on that @thesuperzapper ? Did I understand well?

potiuk avatar Jun 14 '24 14:06 potiuk

Probably worth implementing it on the "airflow" level as you mentioned. Ate you willing to submit a PR on that @thesuperzapper ? Did I understand well?

@potiuk I can definitely help with reviewing PR to implement the workaround above into existing deferred operators that use on_kill, because all of them need to be updated.

In terms of implementing some kind of solution at the airflow level, I'm happy to discuss possible solutions, but since we have no idea what the implementation would be yet, I can't commit to anything.

I guess there are two approaches to a generic solution:

  1. Try and get on_kill to run for deferred operators, probably by rescheduling it or effectively just running it in the exception handler of the trigger like I'm doing above. (The problem is the on_kill method is on the operator, not the triggerer, so it's not available after being deferred)
  2. Just document the above workaround in the deferred operator docs, because It does work, and won't require us to set a new minimum airflow version for some providers. (Note, the only problem is in the extremely rare case that the trigger crashes at the exact moment the exception handler is running, which might mean the external job doesn't get killed, but there are already many similar race conditions in airflow)

thesuperzapper avatar Jun 14 '24 17:06 thesuperzapper

As usual with OSS - we need someone to take a lead on that, so yeah - I see you would love to discuss the solution but it needs someone to take the lead and create a PR for that

potiuk avatar Jun 14 '24 17:06 potiuk