airflow
airflow copied to clipboard
google provider: GCP credentials are not passed into transfer job polling when executing S3ToGCSOperator
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==10.9.0
Apache Airflow version
2.7.2
Operating System
Linux
Deployment
Amazon (AWS) MWAA
Deployment details
No response
What happened
When executing S3ToGCSOperator, it creates the data transfer job successfully but fails to get the job status because it looks for GCP default credentials rather than using the provided gcp_conn_id.
[2024-02-11, 12:48:21 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:21 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:23 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:23 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:24 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:24 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:26 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:26 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 530 files
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:330}} INFO - Overall submitted 10 jobs to transfer 9530 files
[2024-02-11, 12:48:27 UTC] {{taskinstance.py:1526}} INFO - Pausing task as DEFERRED. dag_id=transfer_files, task_id=s3_to_gcs_example, execution_date=20240211T124244, start_date=20240211T124816
[2024-02-11, 12:48:27 UTC] {{local_task_job_runner.py:225}} INFO - Task exited with return code 100 (task deferral)
[2024-02-11, 12:48:28 UTC] {{cloud_storage_transfer_service.py:64}} INFO - Attempting to request jobs statuses
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_default.py:338}} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1359}} INFO - Resuming after deferral
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1382}} INFO - Executing <Task(S3ToGCSOperator): s3_to_gcs_example> on 2024-02-11 12:42:44+00:00
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:57}} INFO - Started process 24431 to run task
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'transfer_files', 's3_to_gcs_example', 'manual__2024-02-11T12:42:44+00:00', '--job-id', '170059', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpr6aa0mal']
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:85}} INFO - Job 170059: Subtask s3_to_gcs_example
[2024-02-11, 12:48:32 UTC] {{baseoperator.py:1600}} ERROR - Trigger failed:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 526, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 598, in run_trigger
async for event in trigger.run():
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py", line 67, in run
jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 520, in get_jobs
client = self.get_conn()
^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 510, in get_conn
self._client = StorageTransferServiceAsyncClient()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/async_client.py", line 225, in __init__
self._client = StorageTransferServiceClient(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/client.py", line 441, in __init__
self._transport = Transport(
^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/grpc_asyncio.py", line 198, in __init__
super().__init__(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/base.py", line 99, in __init__
credentials, _ = google.auth.default(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/auth/_default.py", line 691, in default
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
What you think should happen instead
I think that S3ToGCSOperator should create the storage API object with the GCP credentials provided (gcp_conn_id)
How to reproduce
from airflow import DAG
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from datetime import datetime
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'myself',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
my_dag = DAG(
's3_to_gcs_transfer',
default_args=default_args,
description='Transfer files from S3 to GCS',
schedule_interval=None,
catchup=False,
)
s3_to_gcs_op = S3ToGCSOperator(
task_id="s3_to_gcs_example",
bucket="my-s3-bucket",
prefix="my-prefix",
apply_gcs_prefix=True,
gcp_conn_id="my-gcp-conn-id",
aws_conn_id="my-aws-conn-id",
dest_gcs="gs://my-gcs-bucket/",
replace=False,
deferrable=True,
dag=my_dag,
)
s3_to_gcs_op
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Looks looks like it affect in case of deferrable mode, CloudStorageTransferServiceCreateJobsTrigger do not have such an option to provide connection id
https://github.com/apache/airflow/blob/90e2b12d6b99d2f7db43e45f5e8b97d3b8a43b36/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py#L33-L40
Hello! Now I am investigating this issue and then I will try to prepare a fix for this.
Assigned to you @korolkevich :)
The remaining task on this issue is explained in: https://github.com/apache/airflow/pull/37518#discussion_r1541078941