airflow icon indicating copy to clipboard operation
airflow copied to clipboard

ExternalTaskSensor: An option to use the data_interval variables rather than execution_date

Open HeshamMeneisi opened this issue 2 years ago • 1 comments

Description

Since execution_date is deprecated, it makes sense that this sensor or a new version of it (for backward compatibility) uses the data_interval variables.

Use case/motivation

I want to make sure that the data interval this DAG is about to operate on is covered by another DAG. For example, data was loaded up to this DAG's data_interval_end.

I believe the sensor itself is simple to write. It just uses data_interval_end rather than execution_date and it doesn't look for one specific task, any external.data_interval_end > self.data_interval_end is acceptable.

This is the equivalent of ExternalTaskSensor with execution_delta set to self.interval_start - closest_external_interval.interval_start which can be calculated manually but will fail later if the external DAG schedule is changed.

Related issues

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

HeshamMeneisi avatar Sep 16 '22 07:09 HeshamMeneisi

Makes sense. I believe there’s some discussion somewhere, but this write up is more complete. I think we should allow using either data_interval_start or data_interval_end; technically data_interval_start can be different from execution_date.

To simplify implementation, perhaps it’s a good idea to split the logic into three sensors, one for each value? The three sensors would inherit from the same class that implements most of the logic, but inherited to applied on different values.

uranusjr avatar Sep 16 '22 08:09 uranusjr

Totally interested too! Any development on this already ?

qcha41 avatar Oct 16 '22 12:10 qcha41

I assigned you too - so if you want ot help @qcha41 -> talk directly to @HeshamMeneisi

potiuk avatar Oct 24 '22 02:10 potiuk

I haven't got to do this specifically yet, but in light of the new Dataset logic, this seems like a more intuitive solution for the use-case above, what do you think? I can open a PR.

class DatasetUpToDateSensor(BaseSensorOperator):
    """
    This sensor waits for the current data interval to be covered for a specific Dataset

    :param dataset: The target Dataset
    :param covered_to: The timestamp the dataset is desired to be covered to (updated at/after), defaults to data_interval_end (templated)
    """

    template_fields = ["covered_to"]

    def __init__(
        self, dataset: Dataset, covered_to: Optional[datetime.datetime] = None, **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.dataset = dataset
        self.covered_to = covered_to

    @provide_session
    def poke(self, context: Context, session=None) -> bool:
        dataset: DatasetModel = (
            session.query(DatasetModel)
            .filter(DatasetModel.uri == self.dataset.uri)
            .one_or_none()
        )

        if not dataset:
            raise DatasetNotFoundException(self.dataset)

        desired_covered_to = self.covered_to or context["data_interval_end"]

        return dataset.updated_at >= desired_covered_to

In this case, covered_to can also be set to something like {{ logical_date + macros.timedelta(hours=1) }}, it's quite flexible.

HeshamMeneisi avatar Jan 24 '23 13:01 HeshamMeneisi

Opening a PR is a good idea.

potiuk avatar Feb 19 '23 13:02 potiuk