airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Prevent start trigger initialization in scheduler

Open Lee-W opened this issue 1 year ago • 2 comments

Why

During https://github.com/apache/airflow/pull/38674, I introduced a logic that might run user code in the scheduler here. This PR intent to propose a new logic that could avoid this.

What

  • Introduce a StartTriggerArgs class which should contain trigger_cls (or maybe it should be renamed as trigger_cls_path?), tirgger_kwargs, timeout, next_method which are what we need when we run an operator in deferrable mode.
    • airflow.models.abstractoperator might not be the best module to put this data class but might need some suggestion

Operator authors will now need to set the start_trigger_args this one to start execution directly from triggerer.

from __future__ import annotations

from datetime import timedelta
from typing import Any

from airflow.models.baseoperator import BaseOperator, StartTriggerArgs
from airflow.triggers.temporal import TimeDeltaTrigger


class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs=None,
        next_method="execute_complete",
        timeout=None,
    )

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = {}

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

If the trigger_kwargs is set to None and not updated in __init__ then this task won't be deferred from starting. This is designed this way so that an operator can decide whether to start the execution from the triggerer. e.g.,

class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs=None,
        next_method="execute_complete",
        timeout=None,
    )

    def __init__(self, *args, start_from_trigger=False, **kwargs):
        super().__init__(*args, **kwargs)
        if start_from_trigger is True:
            self.start_trigger_args.trigger_kwargs = {}

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Lee-W avatar May 13 '24 10:05 Lee-W

maybe we should mark this experimental.

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

dstandish avatar May 14 '24 16:05 dstandish

maybe we should mark this experimental.

I'm ok with it. maybe wait for others' comment? if we're to mark it as experimental, where should I do so?

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

In the previous PR, @uranusjr and I discussed whether we could infer start_from_trigger through existing args. I do not have a strong opinion on this one, but create a commit with start_from_trigger so that even if we don't want it, I can still revert it easily

Lee-W avatar May 15 '24 03:05 Lee-W