airflow icon indicating copy to clipboard operation
airflow copied to clipboard

DAGs are able to see historical dataset events when created new

Open tosheer opened this issue 9 months ago • 3 comments

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.

What you think should happen instead?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events). DAG should see all the dataset events from the time when DAG was added.

How to reproduce

  • Create a new DAG which has dataset event producer. Enable both the producer.
  • Run producer dag multiple times.
  • Create a new consumer DAG which also get triggered from same dataset with catchup false.
  • Enable the new DAG.
  • You will see new DAG will run as soon as enabled and see very first event in the dataset.
  • In the next run whenever that was scheduled DAG will get all the events other than the first event it has already consumed.

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

# [START dataset_def]
tosheer_dag1_dataset = Dataset("s3://tosheer-dag1/output_1.txt", extra={"partition": "bye"})


with DAG(
    dag_id="tosheer_dataset_produces_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    tags=["produces", "dataset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(outlets=[tosheer_dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
    # [END task_outlet]

After this add

# [START dag_dep]
with DAG(
    dag_id="tosheer_dataset_consumes_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[tosheer_dag1_dataset],
    tags=["consumes", "dataset-scheduled"],
) as dag3:
    # [END dag_dep]
    BashOperator(
        task_id="consuming_1",
        bash_command='echo "ti_key={{ triggering_dataset_events }}"',
    )

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Plain vanilla airflow.

Anything else?

This issue is associated with an earlier issue https://github.com/apache/airflow/issues/38826. Fix of that issue just fixed disabled - enabled / deleted - recreated.

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

tosheer avatar May 07 '24 09:05 tosheer

I'm not able to reproduce -- I think the context object in the post execute function is incorrect. context["dataset_events"]["test-cluster/test-schema/test-table"] should be (I think) context["triggered_dataset_events"]["test-cluster/test-schema/test-table"]. Even with that updated, it's coming back as an empty List.

RNHTTR avatar May 07 '24 13:05 RNHTTR

Screenshot 2024-05-09 at 11 54 49 PM Screenshot 2024-05-09 at 11 53 15 PM @RNHTTR i have updated the dag definition as i was reproducing issue on local with a more simpler DAG definition. can you please try again and see if you can reproduce the issue. For more details see attached images.

tosheer avatar May 09 '24 18:05 tosheer

Logs

[2024-05-09, 18:24:03 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-05-09, 18:24:04 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-05-09, 18:24:04 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "ti_key=defaultdict(<class \'list\'>, {\'s3://tosheer-dag1/output_1.txt\': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'scheduled__2024-05-08T00:00:00+00:00\', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:06.483814+00:00\', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:27.256324+00:00\', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:23:54.937647+00:00\', source_map_index=-1)]})"']
[2024-05-09, 18:24:04 UTC] {subprocess.py:86} INFO - Output:
[2024-05-09, 18:24:04 UTC] {subprocess.py:93} INFO - ti_key=defaultdict(<class 'list'>, {'s3://tosheer-dag1/output_1.txt': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='scheduled__2024-05-08T00:00:00+00:00', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:06.483814+00:00', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:27.256324+00:00', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:23:54.937647+00:00', source_map_index=-1)]})
[2024-05-09, 18:24:04 UTC] {subprocess.py:97} INFO - Command exited with return code 0

tosheer avatar May 09 '24 18:05 tosheer