prefect
prefect copied to clipboard
Cached task raising tenacity exception crashes
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
I have a task that calls a library function. Inside the library, tenacity
is used for retries. But when a tenacity retry exception is raised, the task crashes without executing its own retries.
Reproduction
import tenacity
from prefect import task
from prefect.tasks import task_input_hash
@tenacity.retry(stop=tenacity.stop_after_attempt(3), reraise=False)
def library_function_with_its_own_retries():
raise ValueError("Simulated library function failure")
@task(
cache_key_fn=task_input_hash,
retries=1,
log_prints=True,
)
def call_library_function():
print("You should see this print statement 2 times")
library_function_with_its_own_retries()
if __name__ == "__main__":
call_library_function()
Error
14:43:18.918 | INFO | prefect.engine - Created task run 'call_library_function-83f0e823' for task 'call_library_function'
14:43:18.980 | INFO | Task run 'call_library_function-83f0e823' - You should see this print statement 2 times
14:43:18.982 | ERROR | Task run 'call_library_function-83f0e823' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
library_function_with_its_own_retries()
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]
14:43:18.985 | ERROR | Task run 'call_library_function-83f0e823' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: cannot pickle '_thread.RLock' object
Traceback (most recent call last):
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
library_function_with_its_own_retries()
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 40, in <module>
call_library_function()
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1392, in enter_task_run_engine
return submit_autonomous_task_run_to_engine(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
return call()
^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
return self.result()
^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_engine.py", line 67, in submit_autonomous_task_run_to_engine
future_result_or_state = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_runners.py", line 231, in submit
result = await call()
^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1806, in begin_task_run
state = await orchestrate_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2124, in orchestrate_task_run
terminal_state = await exception_to_failed_state(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 201, in exception_to_failed_state
data = await result_factory.create_result(exc)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 453, in create_result
return await PersistedResult.create(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 681, in create
data = serializer.dumps(obj)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/serializers.py", line 148, in dumps
blob = pickler.dumps(obj)
^^^^^^^^^^^^^^^^^^
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.RLock' object
Versions (prefect version
output)
Version: 2.19.6
API version: 0.8.4
Python version: 3.11.6
Git commit: 9d938fe7
Built: Mon, Jun 24, 2024 10:23 AM
OS/Arch: darwin/arm64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.43.2
Additional context
This only is a problem if a cache_key_fn
is included in the task decorator.
Easy workarounds exist -
- If you own the library, you can set
reraise=True
in the tenacity retry - You can catch tenacity exceptions in the task, and raise some other exception type
I'll be implementing one of these workarounds, but it would be nice if the core library included handling for tenacity exceptions by default.