airflow
airflow copied to clipboard
BigQuery task decorated functions failing in Airflow 2.9.1
Apache Airflow version
2.9.1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
After upgrading to Airflow 2.9.1, @task decorated functions that implement BigQuery hooks are not successfully submitting jobs but returning an error such as the following:
google.api_core.exceptions.NotFound: 404 GET https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0?location=EU&prettyPrint=false: Not found: Job my-project:EU.airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0
I have replicated this issue against Airflow 2.9.1 and against main
but this does not seem to be related to the Google provider because using either 10.17 or 10.16 will result in this error.
Using either Google provider with Airflow 2.8.4 does not cause this error.
What you think should happen instead?
No response
How to reproduce
Here's a simple DAG that will replicate the issue using Breeze. The bq_hook_test
task will fail but bq_insert_job_test
based on BigQueryInsertJobOperator
with the same configuration will succeed.
breeze --python 3.10 --backend postgres start-airflow --forward-credentials ${HOME}/.config/gcloud
from datetime import datetime
from airflow import models
from airflow.decorators import task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with models.DAG(
dag_id='bq_hook_test',
start_date=datetime(2024, 1, 1),
catchup=False,
schedule='0 0 * * *',
) as dag:
configuration={
'query': {
'query': 'SELECT 1;',
'useLegacySql': False
},
}
@task
def bq_test():
hook = BigQueryHook()
hook.insert_job(
location='EU',
configuration=configuration,
)
bq_hook_test = bq_test()
test = BigQueryInsertJobOperator(
task_id='bq_insert_job_test',
location='EU',
configuration=configuration,
)
We are using Google default credentials for authentication with the following environment variables:
GOOGLE_CLOUD_PROJECT=my-project
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT=google-cloud-platform://
Operating System
n/a
Versions of Apache Airflow Providers
apache-airflow-providers-google=10.17.0
Deployment
Astronomer
Deployment details
No response
Anything else?
Full log exception.
[2024-05-10, 08:36:29 UTC] {taskinstance.py:2910} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/airflow/models/taskinstance.py", line 478, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 441, in _execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 238, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable
return runner.run(*self.op_args, **self.op_kwargs)
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
File "/files/dags/test.py", line 17, in bq_test
hook.insert_job(
File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 524, in inner_wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 1681, in insert_job
job_api_repr.result(timeout=timeout, retry=retry)
File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1626, in result
while not is_job_done():
File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1551, in is_job_done
if self.done(retry=retry, timeout=timeout):
File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 938, in done
self.reload(retry=retry, timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 828, in reload
api_response = client._call_api(
File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 831, in _call_api
return call()
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
return retry_target(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
_retry_error_helper(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
result = target()
File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
raise exceptions.from_http_response(response)
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Could this be something to do with updates to some of the google-api
libraries between Airflow 2.8.4 and 2.9.1?
2.8.4
google-api-core==2.17.1
google-api-python-client==2.122.0
google-auth==2.28.2
2.9.1
google-api-core==2.19.0
google-api-python-client==2.127.0
google-auth==2.29.0
Although I'm confused why this would affect @task decorated functions?
Although I'm confused why this would affect @task decorated functions?
You are using BQHook, so it does not matter if task is decorated
Could this be something to do with updates to some of the google-api libraries between Airflow 2.8.4 and 2.9.1?
Can you downgrade the libraries and check if it fixes the issue?
@VladaZakharova - you might want to take a look at this one
@potiuk Yes, I guess my confusion is more about why hook is giving the error in this context rather than the operator; especially as it doesn't seen to be caused by the provider. I downgraded the libraries but the issue persists.
Look like this is related to google-cloud-bigquery
downgrading it to google-cloud-bigquery<3.21.0 fix for me
@pankajastro Interesting. Downgrading does work but I'm curious to know what issue is and why the hook method gives the error but the operator does not.
It is because the way we pass nowait
param to insert_job
method. When you are using @task then nowait
is False but when you are using operator it is True
https://github.com/apache/airflow/blob/79042cff416296c8bdbd64f2e0a5f4d46c93e882/airflow/providers/google/cloud/operators/bigquery.py#L2899-L2899
error coming from https://github.com/apache/airflow/blob/79042cff416296c8bdbd64f2e0a5f4d46c93e882/airflow/providers/google/cloud/hooks/bigquery.py#L1674-L1676
I have drafted a PR to pin the lib version until we fix it https://github.com/apache/airflow/pull/39583
I see. Thanks for the info.
Closing this task as we have pinned the version and we have a todo in the code that reference this issue
I read the thread, but this also happens with official Google Cloud BigQuery operators that uses BQHook under the hood. Thanks for the info !
Could you also update the constraint file ?
Could you also update the constraint file ?
Constraints are updated only when they prevent things from being build. We never update constraints with new providers - they are generally frozen in time. New provider will be in the constraints of 2.9.2
Nothing prevents you from installing new provider without constraints - this is described in "upgrade scenario" documentation (See installation documentation) - you should follow this rather than stick to constraints in this case.
Oh i thought constraints were the recommended way to always install airflow. I forced the installation of the google-cloud-bigquery package in 3.20.1 without constraints as you mentionned.