airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Starts execution directly from triggerer without going to worker

Open Lee-W opened this issue 1 year ago • 8 comments

Why

For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary. It would be better if we could have a way to run a task in triggerer directly without going into the worker.

In the current solution, we still need to run next_method/execute_complete in the worker. This PR serves as a POC / first step of running the whole execution in triggerer in asyncronize manner.

What

Introduce _start_trigger and _next_method to BaseOperator. If an operator defines both _start_trigger and _next_method in the __init__ method, the scheduler will directly defer the task without going to the worker.

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class StartFromTriggerOperator(BaseOperator):
    _start_trigger = SuccessTrigger()
    _next_method = "execute_complete"

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

These attributes can also be assigned at the instance level. However, dynamic task mapping is not supported in this setup.


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Lee-W avatar Apr 02 '24 11:04 Lee-W

Related :

https://github.com/apache/airflow/pull/31808 https://github.com/apache/airflow/issues/31718

tirkarthi avatar Apr 06 '24 04:04 tirkarthi

I wonder if we could somehow infer starts_execution_from_triggerer from the combination of other arguments, instead of setting the flag explicitly.

uranusjr avatar Apr 08 '24 08:04 uranusjr

I wonder if we could somehow infer starts_execution_from_triggerer from the combination of other arguments, instead of setting the flag explicitly.

For this specific case, I would say yes. We could check whether start_trigger is None to decide. Not sure whether we would want to use this chance to refactor all the existing operators with deferrable support. (e.g., Moving the trigger definition to __init__ or somewhere else maybe?)

Also, the name is set to start_trigger instead trigger because https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/cloud_build.html already take this argument name. Without changing this name to something else, mypy and many tests complain. start_trigger is more like a temporary name. Suggestions are welcomed.

Lee-W avatar Apr 08 '24 09:04 Lee-W

If we also infer starts_execution_from_triggerer from it, start_trigger is not so bad to me, and actually better than trigger because it more clearly describes the attributes controls what trigger the task is started from, not the one trigger the task uses. There are other possibile names (e.g. start_from_trigger), but trigger is not a good one IMO.

uranusjr avatar Apr 09 '24 07:04 uranusjr

This comment is for the original design and is outdated


In the current design, I introduced start_trigger and next_method to the BaseOperator. When start_trigger is not None, the scheduler will call the defer_task method of the task instance object, and everything should work as it does when TaskDeferred is raised.

Developers who want to implement an operator that starts the execution in a triggerer instead of a worker can write something like the following.

from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class AsyncOperator(BaseOperator):
    start_trigger = SuccessTrigger()
    next_method = "execute_complete"

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

However, it raises a few issues.

  1. The DAG developer can overwrite the functionality of the whole operator by passing start_trigger and next_method like the following. Imagine the user user RdsCreateDbSnapshotOperator but passing a GCSBlobTrigger. We probably don't want the DAG developer to change the behavior of the operator, so maybe we can try something like inherits_from_empty_operator
with DAG(...) as dag:
    task = SomeOperator(
        task_id="task",
        start_trigger=SuccessTrigger(),
        next_method="execute_complete",
    )
  1. If we're trying to do something mentioned in point 1, which does not allow the DAG developer to change the operator behavior, will we let them decide whether they want to run op.execute or start_trigger implemented by the operator author? In the current implementation, if start_trigger is not None, op.execute will be ignored. This raises another issue. What if the deployed airflow does not have a triggerer running? Should we try something like _run_inline_trigger, or should we just fallback to op.execute? Or maybe we should add back the flag (was introduced a few commits ago in this PR) to decide whether to start the execution from triggerer? Yet another option is to block operator author to implement start_trigger and execute at the same time.

Lee-W avatar Apr 10 '24 11:04 Lee-W

For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary.

How would that work in case user uses soft_fail? For example in case of timeout?

eladkal avatar Apr 19 '24 09:04 eladkal

For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary.

How would that work in case user uses soft_fail? For example in case of timeout?

I don't think it has anything to do with soft_fail 🤔 It just changes how a task starts but not how it ends. Or is there anything I missed?

Lee-W avatar Apr 19 '24 13:04 Lee-W

I'm planning on merging this PR later today. Please let me know if anyone has concerns. Thanks!

Lee-W avatar Apr 29 '24 06:04 Lee-W