airflow
airflow copied to clipboard
ExternalTaskSensor: An option to use the data_interval variables rather than execution_date
Description
Since execution_date
is deprecated, it makes sense that this sensor or a new version of it (for backward compatibility) uses the data_interval
variables.
Use case/motivation
I want to make sure that the data interval this DAG is about to operate on is covered by another DAG. For example, data was loaded up to this DAG's data_interval_end
.
I believe the sensor itself is simple to write. It just uses data_interval_end
rather than execution_date
and it doesn't look for one specific task, any external.data_interval_end
> self.data_interval_end
is acceptable.
This is the equivalent of ExternalTaskSensor
with execution_delta
set to self.interval_start - closest_external_interval.interval_start
which can be calculated manually but will fail later if the external DAG schedule is changed.
Related issues
No response
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Makes sense. I believe there’s some discussion somewhere, but this write up is more complete. I think we should allow using either data_interval_start
or data_interval_end
; technically data_interval_start
can be different from execution_date
.
To simplify implementation, perhaps it’s a good idea to split the logic into three sensors, one for each value? The three sensors would inherit from the same class that implements most of the logic, but inherited to applied on different values.
Totally interested too! Any development on this already ?
I assigned you too - so if you want ot help @qcha41 -> talk directly to @HeshamMeneisi
I haven't got to do this specifically yet, but in light of the new Dataset logic, this seems like a more intuitive solution for the use-case above, what do you think? I can open a PR.
class DatasetUpToDateSensor(BaseSensorOperator):
"""
This sensor waits for the current data interval to be covered for a specific Dataset
:param dataset: The target Dataset
:param covered_to: The timestamp the dataset is desired to be covered to (updated at/after), defaults to data_interval_end (templated)
"""
template_fields = ["covered_to"]
def __init__(
self, dataset: Dataset, covered_to: Optional[datetime.datetime] = None, **kwargs
) -> None:
super().__init__(**kwargs)
self.dataset = dataset
self.covered_to = covered_to
@provide_session
def poke(self, context: Context, session=None) -> bool:
dataset: DatasetModel = (
session.query(DatasetModel)
.filter(DatasetModel.uri == self.dataset.uri)
.one_or_none()
)
if not dataset:
raise DatasetNotFoundException(self.dataset)
desired_covered_to = self.covered_to or context["data_interval_end"]
return dataset.updated_at >= desired_covered_to
In this case, covered_to
can also be set to something like {{ logical_date + macros.timedelta(hours=1) }}
, it's quite flexible.
Opening a PR is a good idea.