Prevent start trigger initialization in scheduler
Why
During https://github.com/apache/airflow/pull/38674, I introduced a logic that might run user code in the scheduler here. This PR intent to propose a new logic that could avoid this.
What
- Introduce a
StartTriggerArgsclass which should containtrigger_cls(or maybe it should be renamed astrigger_cls_path?),tirgger_kwargs,timeout,next_methodwhich are what we need when we run an operator in deferrable mode.airflow.models.abstractoperatormight not be the best module to put this data class but might need some suggestion
Operator authors will now need to set the start_trigger_args this one to start execution directly from triggerer.
from __future__ import annotations
from datetime import timedelta
from typing import Any
from airflow.models.baseoperator import BaseOperator, StartTriggerArgs
from airflow.triggers.temporal import TimeDeltaTrigger
class AsyncOperator(BaseOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.testing.SuccessTrigger",
trigger_kwargs=None,
next_method="execute_complete",
timeout=None,
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = {}
def execute_complete(self, context, event=None) -> None:
self.log.info("execute complete")
If the trigger_kwargs is set to None and not updated in __init__ then this task won't be deferred from starting. This is designed this way so that an operator can decide whether to start the execution from the triggerer. e.g.,
class AsyncOperator(BaseOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.testing.SuccessTrigger",
trigger_kwargs=None,
next_method="execute_complete",
timeout=None,
)
def __init__(self, *args, start_from_trigger=False, **kwargs):
super().__init__(*args, **kwargs)
if start_from_trigger is True:
self.start_trigger_args.trigger_kwargs = {}
def execute_complete(self, context, event=None) -> None:
self.log.info("execute complete")
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
maybe we should mark this experimental.
trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?
maybe we should mark this experimental.
I'm ok with it. maybe wait for others' comment? if we're to mark it as experimental, where should I do so?
trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the
start_from_triggerboolean`?
In the previous PR, @uranusjr and I discussed whether we could infer start_from_trigger through existing args. I do not have a strong opinion on this one, but create a commit with start_from_trigger so that even if we don't want it, I can still revert it easily