GCSObjectsWithPrefixExistenceSensor in deferrable mode does not return matched objects in the xcom in the first poke (before entering deferrable mode)
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==10.14.0
Apache Airflow version
2.6.3
Operating System
MacOS
Deployment
Other
Deployment details
Local deployment of Airflow (with RabbitMQ queue). Connecting to GCS through custom connection with a SA.
What happened
On the first execution - before entering the deferrable mode, the sensor (in deferrable=True mode) the sensor picks up the files on GCS, turn green, but it does not return the object in the XCom interface.
It works fine if the file has arrived after the sensor entered in the deferrable mode and it is picked up by the triggerer.
The line that might cause this issue is this one:
self.poke(context=context)
../site-packages/airflow/providers/google/cloud/sensors/gcs.py
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=GCSUploadSessionTrigger(
bucket=self.bucket,
prefix=self.prefix,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
inactivity_period=self.inactivity_period,
min_objects=self.min_objects,
previous_objects=self.previous_objects,
allow_delete=self.allow_delete,
hook_params=hook_params,
),
method_name="execute_complete",
)
It might be related to:
https://github.com/apache/airflow/pull/30939 https://github.com/apache/airflow/commit/f89d7b98487d993387ebea4af526fe20204ce02a#diff-03d898c7c92dfd56502de1a64ff072fb8e88b57b3eac7cec7fb5ee14eb4b47d2
What you think should happen instead
No response
How to reproduce
Put the file in the bucket first. Run the DAG code, on the first execution the sensor picks up the file, but does not return it in the Xcom.
wait_for_data_op = GCSObjectsWithPrefixExistenceSensor(
task_id="00.wait_for_data_files",
google_cloud_conn_id=GOOGLE_CLOUD_LANDING_BUCKET_CONN_ID,
bucket=landing_bucket,
prefix=f"{source_objects_prefix}{schema_version_suffix}" +
("" if not source_config.enforce_logical_date_detection else
"_{{ ds_nodash }}"),
deferrable=True,
)
The downstream tasks fail because of that:
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.