airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Bug: `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError`

Open tatiana opened this issue 1 year ago • 1 comments
trafficstars

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

Relevant provider:

apache-airflow-providers-google==10.17.0

All providers installed:

apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-celery==3.6.2
apache-airflow-providers-cncf-kubernetes==8.0.1
apache-airflow-providers-common-io==1.3.1
apache-airflow-providers-common-sql==1.12.0
apache-airflow-providers-datadog==3.5.1
apache-airflow-providers-elasticsearch==5.3.4
apache-airflow-providers-fab==1.0.4
apache-airflow-providers-ftp==3.8.0
apache-airflow-providers-google==10.17.0
apache-airflow-providers-http==4.10.1
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==9.0.1
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-openlineage==1.7.0
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-redis==3.6.1
apache-airflow-providers-smtp==1.6.1
apache-airflow-providers-sqlite==3.7.1

Apache Airflow version

apache-airflow==2.9.0

Operating System

macOS 14.2.1

Deployment

Astronomer

Deployment details

Reproducible locally by using:

  • Astro CLI Version: 1.26.0
  • Base docker image: quay.io/astronomer/astro-runtime:11.2.0

What happened

When trying to run the example_dataproc_batch.py DAG locally: example_dataproc_batch.py

Some of the tasks failed: Screenshot 2024-05-03 at 12 16 52

An example of failure was create_batch_2:

    create_batch_2 = DataprocCreateBatchOperator(
        task_id="create_batch_2",
        project_id=PROJECT_ID,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID_2,
        result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
    )

Which raised the error:

[2024-05-03, 11:03:17 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 264, in wait_for_operation
    return operation.result(timeout=timeout, retry=result_retry)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 256, in result
    self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 207, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 174, in done
    self._refresh_and_update(retry)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 163, in _refresh_and_update
    self._set_result_from_operation()
  File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 130, in _set_result_from_operation
    if not self._operation.done or self._result_set:
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 434, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2537, in execute
    result = hook.wait_for_operation(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 266, in wait_for_operation
    error = operation.exception(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 282, in exception
    self._blocking_poll(timeout=timeout)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 207, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 174, in done
    self._refresh_and_update(retry)
  File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 161, in _refresh_and_update
    if not self._operation.done:
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
[2024-05-03, 11:03:17 UTC] {taskinstance.py:1139} INFO - Marking task as FAILED. dag_id=dataproc_batch, task_id=create_batch_2, execution_date=20240503T110313, start_date=20240503T110314, end_date=20240503T110317
[2024-05-03, 11:03:17 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 49516 for task create_batch_2 ('coroutine' object has no attribute 'done'; 7995)
[2024-05-03, 11:03:17 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-05-03, 11:03:17 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check

The difference of this task instance when compared to the other instances of DataprocCreateBatchOperator that worked is the usage of the argument result_retry.

What you think should happen instead

Running the DAG should not raise exceptions when using DataprocCreateBatchOperator with the argument result_retry.

How to reproduce

Run example_dataproc_batch.py

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

tatiana avatar May 03 '24 11:05 tatiana

Are you sure that is not a problem in dependency package google-api-core? Could you provide the version of installed packages?

Taragolis avatar May 03 '24 12:05 Taragolis

Hi @Taragolis, thanks for the quick feedback!

This is a complete requirements.txt that can reproduce the original problem.

You're right. When upgrading google-api-core from 2.18.0 to 2.19.0, the problem is solved, as it can be seen from the screenshot below:

Screenshot 2024-05-03 at 23 03 20

I also tested downgrading google-api-core to 2.17.0, which works.

As for the fix, I made a PR to avoid this version of google-api-core being installed: https://github.com/apache/airflow/pull/39462

What are your thoughts?

tatiana avatar May 07 '24 12:05 tatiana