airflow icon indicating copy to clipboard operation
airflow copied to clipboard

GCSObjectsWithPrefixExistenceSensor in deferrable mode does not return matched objects in the xcom in the first poke (before entering deferrable mode)

Open hejnal opened this issue 1 year ago • 1 comments

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.14.0

Apache Airflow version

2.6.3

Operating System

MacOS

Deployment

Other

Deployment details

Local deployment of Airflow (with RabbitMQ queue). Connecting to GCS through custom connection with a SA.

What happened

On the first execution - before entering the deferrable mode, the sensor (in deferrable=True mode) the sensor picks up the files on GCS, turn green, but it does not return the object in the XCom interface.

It works fine if the file has arrived after the sensor entered in the deferrable mode and it is picked up by the triggerer.

The line that might cause this issue is this one:

self.poke(context=context)
../site-packages/airflow/providers/google/cloud/sensors/gcs.py

 if not self.poke(context=context):
            self.defer(
                timeout=timedelta(seconds=self.timeout),
                trigger=GCSUploadSessionTrigger(
                    bucket=self.bucket,
                    prefix=self.prefix,
                    poke_interval=self.poke_interval,
                    google_cloud_conn_id=self.google_cloud_conn_id,
                    inactivity_period=self.inactivity_period,
                    min_objects=self.min_objects,
                    previous_objects=self.previous_objects,
                    allow_delete=self.allow_delete,
                    hook_params=hook_params,
                ),
                method_name="execute_complete",
            )

It might be related to:

https://github.com/apache/airflow/pull/30939 https://github.com/apache/airflow/commit/f89d7b98487d993387ebea4af526fe20204ce02a#diff-03d898c7c92dfd56502de1a64ff072fb8e88b57b3eac7cec7fb5ee14eb4b47d2

image

What you think should happen instead

No response

How to reproduce

Put the file in the bucket first. Run the DAG code, on the first execution the sensor picks up the file, but does not return it in the Xcom.

wait_for_data_op = GCSObjectsWithPrefixExistenceSensor(
        task_id="00.wait_for_data_files",
        google_cloud_conn_id=GOOGLE_CLOUD_LANDING_BUCKET_CONN_ID,
        bucket=landing_bucket,
        prefix=f"{source_objects_prefix}{schema_version_suffix}" +
        ("" if not source_config.enforce_logical_date_detection else
         "_{{ ds_nodash }}"),
        deferrable=True,
    )

The downstream tasks fail because of that: image

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

hejnal avatar Feb 20 '24 14:02 hejnal

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Feb 20 '24 14:02 boring-cyborg[bot]