airflow
airflow copied to clipboard
Starts execution directly from triggerer without going to worker
Why
For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary. It would be better if we could have a way to run a task in triggerer directly without going into the worker.
In the current solution, we still need to run next_method/execute_complete in the worker. This PR serves as a POC / first step of running the whole execution in triggerer in asyncronize manner.
What
Introduce _start_trigger and _next_method to BaseOperator. If an operator defines both _start_trigger and _next_method in the __init__ method, the scheduler will directly defer the task without going to the worker.
from __future__ import annotations
from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger
class StartFromTriggerOperator(BaseOperator):
_start_trigger = SuccessTrigger()
_next_method = "execute_complete"
def execute_complete(self, context, event=None) -> None:
self.log.info("execute complete")
These attributes can also be assigned at the instance level. However, dynamic task mapping is not supported in this setup.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
Related :
https://github.com/apache/airflow/pull/31808 https://github.com/apache/airflow/issues/31718
I wonder if we could somehow infer starts_execution_from_triggerer from the combination of other arguments, instead of setting the flag explicitly.
I wonder if we could somehow infer
starts_execution_from_triggererfrom the combination of other arguments, instead of setting the flag explicitly.
For this specific case, I would say yes. We could check whether start_trigger is None to decide. Not sure whether we would want to use this chance to refactor all the existing operators with deferrable support. (e.g., Moving the trigger definition to __init__ or somewhere else maybe?)
Also, the name is set to start_trigger instead trigger because https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/cloud_build.html already take this argument name. Without changing this name to something else, mypy and many tests complain. start_trigger is more like a temporary name. Suggestions are welcomed.
If we also infer starts_execution_from_triggerer from it, start_trigger is not so bad to me, and actually better than trigger because it more clearly describes the attributes controls what trigger the task is started from, not the one trigger the task uses. There are other possibile names (e.g. start_from_trigger), but trigger is not a good one IMO.
This comment is for the original design and is outdated
In the current design, I introduced start_trigger and next_method to the BaseOperator. When start_trigger is not None, the scheduler will call the defer_task method of the task instance object, and everything should work as it does when TaskDeferred is raised.
Developers who want to implement an operator that starts the execution in a triggerer instead of a worker can write something like the following.
from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger
class AsyncOperator(BaseOperator):
start_trigger = SuccessTrigger()
next_method = "execute_complete"
def execute_complete(self, context, event=None) -> None:
self.log.info("execute complete")
However, it raises a few issues.
- The DAG developer can overwrite the functionality of the whole operator by passing
start_triggerandnext_methodlike the following. Imagine the user userRdsCreateDbSnapshotOperatorbut passing aGCSBlobTrigger.We probably don't want the DAG developer to change the behavior of the operator, so maybe we can try something like inherits_from_empty_operator
with DAG(...) as dag:
task = SomeOperator(
task_id="task",
start_trigger=SuccessTrigger(),
next_method="execute_complete",
)
- If we're trying to do something mentioned in point 1, which does not allow the DAG developer to change the operator behavior, will we let them decide whether they want to run
op.executeorstart_triggerimplemented by the operator author? In the current implementation, ifstart_triggeris notNone,op.executewill be ignored. This raises another issue. What if the deployed airflow does not have a triggerer running? Should we try something like _run_inline_trigger, or should we just fallback toop.execute? Or maybe we should add back the flag (was introduced a few commits ago in this PR) to decide whether to start the execution from triggerer? Yet another option is to block operator author to implementstart_triggerandexecuteat the same time.
For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary.
How would that work in case user uses soft_fail? For example in case of timeout?
For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary.
How would that work in case user uses
soft_fail? For example in case of timeout?
I don't think it has anything to do with soft_fail 🤔 It just changes how a task starts but not how it ends. Or is there anything I missed?
I'm planning on merging this PR later today. Please let me know if anyone has concerns. Thanks!