airflow
airflow copied to clipboard
DAGs are able to see historical dataset events when created new
Apache Airflow version
2.9.1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.
What you think should happen instead?
If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events). DAG should see all the dataset events from the time when DAG was added.
How to reproduce
- Create a new DAG which has dataset event producer. Enable both the producer.
- Run producer dag multiple times.
- Create a new consumer DAG which also get triggered from same dataset with catchup false.
- Enable the new DAG.
- You will see new DAG will run as soon as enabled and see very first event in the dataset.
- In the next run whenever that was scheduled DAG will get all the events other than the first event it has already consumed.
import pendulum
from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
# [START dataset_def]
tosheer_dag1_dataset = Dataset("s3://tosheer-dag1/output_1.txt", extra={"partition": "bye"})
with DAG(
dag_id="tosheer_dataset_produces_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
tags=["produces", "dataset-scheduled"],
) as dag1:
# [START task_outlet]
BashOperator(outlets=[tosheer_dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
# [END task_outlet]
After this add
# [START dag_dep]
with DAG(
dag_id="tosheer_dataset_consumes_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[tosheer_dag1_dataset],
tags=["consumes", "dataset-scheduled"],
) as dag3:
# [END dag_dep]
BashOperator(
task_id="consuming_1",
bash_command='echo "ti_key={{ triggering_dataset_events }}"',
)
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
Plain vanilla airflow.
Anything else?
This issue is associated with an earlier issue https://github.com/apache/airflow/issues/38826. Fix of that issue just fixed disabled - enabled / deleted - recreated.
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I'm not able to reproduce -- I think the context object in the post execute function is incorrect. context["dataset_events"]["test-cluster/test-schema/test-table"]
should be (I think) context["triggered_dataset_events"]["test-cluster/test-schema/test-table"]
. Even with that updated, it's coming back as an empty List.
Logs
[2024-05-09, 18:24:03 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-05-09, 18:24:04 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-05-09, 18:24:04 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "ti_key=defaultdict(<class \'list\'>, {\'s3://tosheer-dag1/output_1.txt\': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'scheduled__2024-05-08T00:00:00+00:00\', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:06.483814+00:00\', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:27.256324+00:00\', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:23:54.937647+00:00\', source_map_index=-1)]})"']
[2024-05-09, 18:24:04 UTC] {subprocess.py:86} INFO - Output:
[2024-05-09, 18:24:04 UTC] {subprocess.py:93} INFO - ti_key=defaultdict(<class 'list'>, {'s3://tosheer-dag1/output_1.txt': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='scheduled__2024-05-08T00:00:00+00:00', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:06.483814+00:00', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:27.256324+00:00', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:23:54.937647+00:00', source_map_index=-1)]})
[2024-05-09, 18:24:04 UTC] {subprocess.py:97} INFO - Command exited with return code 0