airflow icon indicating copy to clipboard operation
airflow copied to clipboard

BigQuery task decorated functions failing in Airflow 2.9.1

Open nathadfield opened this issue 3 months ago • 7 comments

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

After upgrading to Airflow 2.9.1, @task decorated functions that implement BigQuery hooks are not successfully submitting jobs but returning an error such as the following:

google.api_core.exceptions.NotFound: 404 GET https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0?location=EU&prettyPrint=false: Not found: Job my-project:EU.airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0

I have replicated this issue against Airflow 2.9.1 and against main but this does not seem to be related to the Google provider because using either 10.17 or 10.16 will result in this error.

Using either Google provider with Airflow 2.8.4 does not cause this error.

What you think should happen instead?

No response

How to reproduce

Here's a simple DAG that will replicate the issue using Breeze. The bq_hook_test task will fail but bq_insert_job_test based on BigQueryInsertJobOperator with the same configuration will succeed.

breeze --python 3.10 --backend postgres start-airflow --forward-credentials ${HOME}/.config/gcloud
from datetime import datetime
from airflow import models

from airflow.decorators import task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with models.DAG(
    dag_id='bq_hook_test',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule='0 0 * * *',
) as dag:

    configuration={
        'query': {
            'query': 'SELECT 1;',
            'useLegacySql': False
        },
    }

    @task
    def bq_test():
        hook = BigQueryHook()
        hook.insert_job(
            location='EU',
            configuration=configuration,
        )


    bq_hook_test = bq_test()

    test = BigQueryInsertJobOperator(
        task_id='bq_insert_job_test',
        location='EU',
        configuration=configuration,
    )
Screenshot 2024-05-10 at 10 38 21

We are using Google default credentials for authentication with the following environment variables:

GOOGLE_CLOUD_PROJECT=my-project
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT=google-cloud-platform://

Operating System

n/a

Versions of Apache Airflow Providers

apache-airflow-providers-google=10.17.0

Deployment

Astronomer

Deployment details

No response

Anything else?

Full log exception.

[2024-05-10, 08:36:29 UTC] {taskinstance.py:2910} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 478, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 441, in _execute_callable
    return ExecutionCallableRunner(
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
    return_value = super().execute(context)
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
  File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
  File "/files/dags/test.py", line 17, in bq_test
    hook.insert_job(
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 524, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 1681, in insert_job
    job_api_repr.result(timeout=timeout, retry=retry)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1626, in result
    while not is_job_done():
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1551, in is_job_done
    if self.done(retry=retry, timeout=timeout):
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 938, in done
    self.reload(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 828, in reload
    api_response = client._call_api(
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 831, in _call_api
    return call()
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

nathadfield avatar May 10 '24 10:05 nathadfield

Could this be something to do with updates to some of the google-api libraries between Airflow 2.8.4 and 2.9.1?

2.8.4

google-api-core==2.17.1
google-api-python-client==2.122.0
google-auth==2.28.2

2.9.1

google-api-core==2.19.0
google-api-python-client==2.127.0
google-auth==2.29.0

Although I'm confused why this would affect @task decorated functions?

nathadfield avatar May 10 '24 10:05 nathadfield

Although I'm confused why this would affect @task decorated functions?

You are using BQHook, so it does not matter if task is decorated

Could this be something to do with updates to some of the google-api libraries between Airflow 2.8.4 and 2.9.1?

Can you downgrade the libraries and check if it fixes the issue?

@VladaZakharova - you might want to take a look at this one

potiuk avatar May 10 '24 11:05 potiuk

@potiuk Yes, I guess my confusion is more about why hook is giving the error in this context rather than the operator; especially as it doesn't seen to be caused by the provider. I downgraded the libraries but the issue persists.

nathadfield avatar May 10 '24 12:05 nathadfield

Look like this is related to google-cloud-bigquery downgrading it to google-cloud-bigquery<3.21.0 fix for me

pankajastro avatar May 13 '24 06:05 pankajastro

@pankajastro Interesting. Downgrading does work but I'm curious to know what issue is and why the hook method gives the error but the operator does not.

nathadfield avatar May 13 '24 08:05 nathadfield

It is because the way we pass nowait param to insert_job method. When you are using @task then nowait is False but when you are using operator it is True

https://github.com/apache/airflow/blob/79042cff416296c8bdbd64f2e0a5f4d46c93e882/airflow/providers/google/cloud/operators/bigquery.py#L2899-L2899

error coming from https://github.com/apache/airflow/blob/79042cff416296c8bdbd64f2e0a5f4d46c93e882/airflow/providers/google/cloud/hooks/bigquery.py#L1674-L1676

I have drafted a PR to pin the lib version until we fix it https://github.com/apache/airflow/pull/39583

pankajastro avatar May 13 '24 10:05 pankajastro

I see. Thanks for the info.

nathadfield avatar May 13 '24 10:05 nathadfield