[FEATURE] `TaskDependencySensor` improvements
Is your feature request related to a problem? Please describe.
-
Consistent approach is required across all sensors to determine the time that will be used as a base for calculating whether or not upstream criteria are fulfilled. This approach should make sensor executions reproducable if
brickflow_start_time/brickflow_start_datecustom parameters are set manually or job is scheduled and the execution date will be derived from CRON expression. CurrentlyTaskDependencySensoris usingdatetime.now()approach: https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L324-L326 -
From the sensor usage perspective, the user does not care if the upstream DAG failed or not, what matters is the ability to trigger downstream tasks when upstream dependency is fulfilled, even if it was delayed due to the need to restart the failed DAG. That means that failing the task and the workflow is not a desired scenario for the operator usage. https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L333-L337
-
Along the same lines as (2), failing the sensor if the upstream execution is not found is not a desirable flow: https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L261-L264 The argument that the execution is always created by the Airflow, even if it not yet started, is not valid, because if the DAG paramers
depends_on_pastis used, new executions won't be created unless older ones are succesfull.
Cloud Information
- [x] AWS
- [x] Azure
- [x] GCP
- [x] Other
Describe the solution you'd like
- Use
context["execution_date"]which is available for Airflow operators or usebrickflow_start_timefrom the Brickflow context. - Continue poking upstream.
- Log that upstream execution is not found and continue poking.