prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Cached task raising tenacity exception crashes

Open thundercat1 opened this issue 7 months ago • 5 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I have a task that calls a library function. Inside the library, tenacity is used for retries. But when a tenacity retry exception is raised, the task crashes without executing its own retries.

Reproduction

import tenacity
from prefect import task
from prefect.tasks import task_input_hash

@tenacity.retry(stop=tenacity.stop_after_attempt(3), reraise=False)
def library_function_with_its_own_retries():
    raise ValueError("Simulated library function failure")


@task(
    cache_key_fn=task_input_hash,
    retries=1,
    log_prints=True,
)
def call_library_function():
    print("You should see this print statement 2 times")
    library_function_with_its_own_retries()


if __name__ == "__main__":
    call_library_function()

Error

14:43:18.918 | INFO    | prefect.engine - Created task run 'call_library_function-83f0e823' for task 'call_library_function'
14:43:18.980 | INFO    | Task run 'call_library_function-83f0e823' - You should see this print statement 2 times
14:43:18.982 | ERROR   | Task run 'call_library_function-83f0e823' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
    raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
    library_function_with_its_own_retries()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]
14:43:18.985 | ERROR   | Task run 'call_library_function-83f0e823' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: cannot pickle '_thread.RLock' object
Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
    raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
    library_function_with_its_own_retries()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 40, in <module>
    call_library_function()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1392, in enter_task_run_engine
    return submit_autonomous_task_run_to_engine(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_engine.py", line 67, in submit_autonomous_task_run_to_engine
    future_result_or_state = from_sync.wait_for_call_in_loop_thread(
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
             ^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1806, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2124, in orchestrate_task_run
    terminal_state = await exception_to_failed_state(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 201, in exception_to_failed_state
    data = await result_factory.create_result(exc)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 453, in create_result
    return await PersistedResult.create(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 681, in create
    data = serializer.dumps(obj)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/serializers.py", line 148, in dumps
    blob = pickler.dumps(obj)
           ^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.RLock' object

Versions (prefect version output)

Version:             2.19.6
API version:         0.8.4
Python version:      3.11.6
Git commit:          9d938fe7
Built:               Mon, Jun 24, 2024 10:23 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.43.2

Additional context

This only is a problem if a cache_key_fn is included in the task decorator.

Easy workarounds exist -

  1. If you own the library, you can set reraise=True in the tenacity retry
  2. You can catch tenacity exceptions in the task, and raise some other exception type

I'll be implementing one of these workarounds, but it would be nice if the core library included handling for tenacity exceptions by default.

thundercat1 avatar Jun 28 '24 20:06 thundercat1