airflow
airflow copied to clipboard
Bug: `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError`
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
Relevant provider:
apache-airflow-providers-google==10.17.0
All providers installed:
apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-celery==3.6.2
apache-airflow-providers-cncf-kubernetes==8.0.1
apache-airflow-providers-common-io==1.3.1
apache-airflow-providers-common-sql==1.12.0
apache-airflow-providers-datadog==3.5.1
apache-airflow-providers-elasticsearch==5.3.4
apache-airflow-providers-fab==1.0.4
apache-airflow-providers-ftp==3.8.0
apache-airflow-providers-google==10.17.0
apache-airflow-providers-http==4.10.1
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==9.0.1
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-openlineage==1.7.0
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-redis==3.6.1
apache-airflow-providers-smtp==1.6.1
apache-airflow-providers-sqlite==3.7.1
Apache Airflow version
apache-airflow==2.9.0
Operating System
macOS 14.2.1
Deployment
Astronomer
Deployment details
Reproducible locally by using:
- Astro CLI Version: 1.26.0
- Base docker image: quay.io/astronomer/astro-runtime:11.2.0
What happened
When trying to run the example_dataproc_batch.py DAG locally:
example_dataproc_batch.py
Some of the tasks failed:
An example of failure was create_batch_2:
create_batch_2 = DataprocCreateBatchOperator(
task_id="create_batch_2",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)
Which raised the error:
[2024-05-03, 11:03:17 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 264, in wait_for_operation
return operation.result(timeout=timeout, retry=result_retry)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 256, in result
self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
polling(self._done_or_raise)(retry=retry)
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
return retry_target(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 207, in retry_target
result = target()
^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
if not self.done(retry=retry):
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 174, in done
self._refresh_and_update(retry)
File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 163, in _refresh_and_update
self._set_result_from_operation()
File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 130, in _set_result_from_operation
if not self._operation.done or self._result_set:
^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 434, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2537, in execute
result = hook.wait_for_operation(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 266, in wait_for_operation
error = operation.exception(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 282, in exception
self._blocking_poll(timeout=timeout)
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
polling(self._done_or_raise)(retry=retry)
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
return retry_target(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 207, in retry_target
result = target()
^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
if not self.done(retry=retry):
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 174, in done
self._refresh_and_update(retry)
File "/usr/local/lib/python3.11/site-packages/google/api_core/operation.py", line 161, in _refresh_and_update
if not self._operation.done:
^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
[2024-05-03, 11:03:17 UTC] {taskinstance.py:1139} INFO - Marking task as FAILED. dag_id=dataproc_batch, task_id=create_batch_2, execution_date=20240503T110313, start_date=20240503T110314, end_date=20240503T110317
[2024-05-03, 11:03:17 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 49516 for task create_batch_2 ('coroutine' object has no attribute 'done'; 7995)
[2024-05-03, 11:03:17 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-05-03, 11:03:17 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
The difference of this task instance when compared to the other instances of DataprocCreateBatchOperator that worked is the usage of the argument result_retry.
What you think should happen instead
Running the DAG should not raise exceptions when using DataprocCreateBatchOperator with the argument result_retry.
How to reproduce
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Are you sure that is not a problem in dependency package google-api-core?
Could you provide the version of installed packages?
Hi @Taragolis, thanks for the quick feedback!
This is a complete requirements.txt that can reproduce the original problem.
You're right. When upgrading google-api-core from 2.18.0 to 2.19.0, the problem is solved, as it can be seen from the screenshot below:
I also tested downgrading google-api-core to 2.17.0, which works.
As for the fix, I made a PR to avoid this version of google-api-core being installed: https://github.com/apache/airflow/pull/39462
What are your thoughts?