airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Trigger event from deferred task does not get scheduled immediately, leading to timeout.

Open Birne94 opened this issue 1 year ago • 6 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

We are using deferred operators to execute jobs in databricks. These jobs utlize a common database so we use task pools to limit the concurrency to 1 task. This pool includes deferred operators. In some cases we see task timeouts, even though the deferred task successfully finished. You can see 1.5h passing between trigger event and scheduling:

[2024-12-06, 14:01:10 CET] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=my-dag, task_id=my-task, execution_date=20241205T130000, start_date=20241206T130108
[2024-12-06, 14:01:10 CET] {{local_task_job_runner.py:231}} INFO - Task exited with return code 100 (task deferral)
[2024-12-06, 14:01:11 CET] {{base.py:83}} INFO - Using connection ID 'databricks' for task execution.
[2024-12-06, 14:01:11 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'PENDING', 'result_state': '', 'state_message': 'Waiting for cluster'}. sleeping for 30 seconds
...
[2024-12-06, 14:09:42 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:12 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:42 CET] {{triggerer_job_runner.py:602}} INFO - Trigger my-dag/scheduled__2024-12-05T13:00:00+00:00/my-task/-1/1 (ID 10030) fired: TriggerEvent<{'run_id': 847717920033451, 'run_page_url': '...', 'run_state': '{"life_cycle_state": "TERMINATED", "result_state": "SUCCESS", "state_message": ""}'}>
[2024-12-06, 15:38:27 CET] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: my-dag.my-task scheduled__2024-12-05T13:00:00+00:00 [queued]>
...
[2024-12-06, 15:38:27 CET] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 425, in _execute_task
    raise AirflowTaskTimeout()
airflow.exceptions.AirflowTaskTimeout

Our assumption of what happens in the following:

  • Many tasks are waiting to be executed but are limited by the pool
  • Task starts running and is deferred (pool slot is consumed)
  • Deferred task is running in the triggerer (pool slot is consumed)
  • Deferred task emits trigger event and stops (pool slot is released)
  • As the pool slot is released, another task starts running (pool slot is consumed again)
  • The post-deferral task for our previous task is scheduled, but cannot run due to unavailable pool slots.
  • After the task that got scheduled in between finishes and the pool is released, the post-deferral task runs and times out immediately.

What you think should happen instead?

I see multiple things that could improve this behavior:

  • Tasks waking up after deferral do not consume slots within task pools.
  • Tasks waking up have priority over other tasks when making scheduling decisions.
  • Tasks waking up have their own timeout for the post-deferral trigger.

How to reproduce

  • Create a DAG with many deferrable tasks sharing a single task pool.
  • Reduce pool capacity to 1 and enable Include Deferred.
  • Observe that sometimes a new task is scheduled before the post-deferral task is being scheduled.

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Birne94 avatar Dec 07 '24 09:12 Birne94

If you are not doing anything on execute_complete and on track to upgrade to 2.10.0 maybe exit task directly from trigger could help.

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers

tirkarthi avatar Dec 09 '24 14:12 tirkarthi

If you are not doing anything on execute_complete and on track to upgrade to 2.10.0 maybe exit task directly from trigger could help.

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers

Thank you for that hint, that sounds interesting. We need to apply a bit of logic before exiting and publish task results to xcom. From the code I see that this case would be supported, hwoever it is not mentioned in the docs. Is that feature stable already?

Birne94 avatar Dec 09 '24 14:12 Birne94

It's added and stable in 2.10 and you can set self.xcoms which will get pushed.

https://github.com/apache/airflow/blob/cf4f2caac93c3482c4812ce0f2e81f822f323762/airflow/triggers/base.py#L229

TaskSuccessEvent and TaskFailureEvent are the two events that can be used to end the task instance directly. This marks the task with the state task_instance_state and optionally pushes xcom if applicable. Here’s an example of how to use these events:

Maybe the doc could have some examples or explain this feature better

tirkarthi avatar Dec 09 '24 15:12 tirkarthi

Thank you @tirkarthi, we will see if we can prioritize upgrading our MWAA environment to 2.10 and testing this approach.

I assume that the described behavior (post-deferral task execution being delayed) is expected in this case and changing it would be more of a QoL change rather than a bug, right?

Birne94 avatar Dec 10 '24 08:12 Birne94

Given what @tirkarthi mentioned - changing this to doc only issue. PR to improve the doc are welcome

eladkal avatar Dec 10 '24 08:12 eladkal

Hey @eladkal I want to contribute to the improved documentation regarding this issue. Although I'm a newbie to Apache Airflow, could you guide me on how I can get started with it? Where can I find the appropriate documentation to contribute?

Help would be much appreciated. Thanks

avyuktsoni0731 avatar Dec 21 '24 13:12 avyuktsoni0731

@avyuktsoni0731 check https://github.com/apache/airflow/blob/main/contributing-docs/README.rst

eladkal avatar Dec 21 '24 19:12 eladkal

@eladkal I've created a PR. I tried to understand the issue that has been addressed, but do let me know if any other changes are required, will look into it.

avyuktsoni0731 avatar Dec 22 '24 17:12 avyuktsoni0731

Hi, @eladkal it seems like this issue has been inactive for a while, could I help with this?

guan404ming avatar Apr 02 '25 13:04 guan404ming

Hello! Is there any solution to the original issue? I'm also planning to run deferred tasks in a pool where they are accounted for. In case of success, I immediately mark the task as successful, but in case of failure, I want the on_failure_callback to trigger, which sends notifications to Slack. Currently, it only works if the task is executed on workers.

Also, for some operators, in case of failure, I’d like to run some custom logic. Right now, there are no guarantees that the task won’t get stuck in the pool waiting for a slot and end up running only after the failure has already occurred.

Maybe a possible solution would be the ability to change the pool of a task after the deferred task has finished on a triggerrer. Then I could simply have two pools — one where deferred tasks are considered as running, and one where they are not.

boltonidze avatar Apr 22 '25 00:04 boltonidze

Hi, @eladkal it seems like this issue has been inactive for a while, could I help with this?

Feel free to raise PR

eladkal avatar Apr 22 '25 01:04 eladkal

look like we should add an infinite high priority to tasks ( with a pool slot >0 ) leaving the airflow_triggerer and that need a final execution in an airflow_worker

raphaelauv avatar May 14 '25 11:05 raphaelauv

Based on deeper investigation in #49535, I think we could leave this here until there is any re-implementation plan. Feel free to open another issue for re-implementation.

guan404ming avatar Oct 31 '25 02:10 guan404ming

Related issue #57210

tirkarthi avatar Oct 31 '25 14:10 tirkarthi