prefect icon indicating copy to clipboard operation
prefect copied to clipboard

retry_condition_fn doesn't work on Failed state

Open ihor-ramskyi-globallogic opened this issue 1 year ago • 3 comments

Bug summary

From def task docstring:

retry_condition_fn: An optional callable run when a task run returns a Failed state. Should
    return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task
    should end as failed. Defaults to `None`, indicating the task should always continue
    to its retry policy.

When I return Failed state instead of raising error, it doesn't trigger. Minimal reproducible example:

from prefect import flow, task, get_run_logger
from prefect.states import Failed


def cond(task, task_run, state) -> bool:
    try:
        state.result()
    except:
        return "please_retry" in state.message


@task(retries=1, retry_condition_fn=cond)
def task_a():
    logger = get_run_logger()
    logger.info("Inside task_a")
    return Failed(message="please_retry")


@task(retries=1, retry_condition_fn=cond)
def task_b():
    logger = get_run_logger()
    logger.info("Inside task_b")
    raise Exception("please_retry")


@flow()
def generic_flow():
    task_a()
    task_b()


if __name__ == "__main__":
    generic_flow()

Traceback:

Logs and traceback:

16:34:49.831 | INFO    | prefect.engine - Created flow run 'warm-moth' for flow 'generic-flow'
16:34:49.835 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/113d3b35-c54f-4d83-9b7e-a91a6818380b
16:34:49.981 | INFO    | Task run 'task_a-4ee' - Inside task_a
16:34:49.990 | ERROR   | Task run 'task_a-4ee' - Finished in state Failed('please_retry')
16:34:50.009 | INFO    | Task run 'task_b-53a' - Inside task_b
16:34:50.010 | INFO    | Task run 'task_b-53a' - Task run failed with exception: Exception('please_retry') - Retry 1/1 will start immediately
16:34:50.016 | INFO    | Task run 'task_b-53a' - Inside task_b
16:34:50.017 | ERROR   | Task run 'task_b-53a' - Task run failed with exception: Exception('please_retry') - Retries are exhausted
Traceback (most recent call last):
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context
    yield self
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "My_Path\test_retries.py", line 24, in task_b
    raise Exception("please_retry")
Exception: please_retry
16:34:50.028 | ERROR   | Task run 'task_b-53a' - Finished in state Failed('Task run encountered an exception Exception: please_retry')
16:34:50.029 | ERROR   | Flow run 'warm-moth' - Encountered exception during execution: Exception('please_retry')
Traceback (most recent call last):
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 655, in run_context
    yield self
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "My_Path\test_retries.py", line 30, in generic_flow
    task_b()
  File "My_Path\venv\lib\site-packages\prefect\tasks.py", line 1002, in __call__
    return run_task(
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1507, in run_task
    return run_task_sync(**kwargs)
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1320, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 475, in result
    raise self._raised
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context
    yield self
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "My_Path\test_retries.py", line 24, in task_b
    raise Exception("please_retry")
Exception: please_retry
16:34:50.089 | ERROR   | Flow run 'warm-moth' - Finished in state Failed('Flow run encountered an exception: Exception: please_retry')
Traceback (most recent call last):
  File "My_Path\test_retries.py", line 34, in <module>
    generic_flow()
  File "My_Path\venv\lib\site-packages\prefect\flows.py", line 1355, in __call__
    return run_flow(
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 255, in result
    raise self._raised
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 655, in run_context
    yield self
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "My_Path\test_retries.py", line 30, in generic_flow
    task_b()
  File "My_Path\venv\lib\site-packages\prefect\tasks.py", line 1002, in __call__
    return run_task(
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1507, in run_task
    return run_task_sync(**kwargs)
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1320, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 475, in result
    raise self._raised
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context
    yield self
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "My_Path\test_retries.py", line 24, in task_b
    raise Exception("please_retry")
Exception: please_retry

Version info

Version:             3.0.11
API version:         0.8.4
Python version:      3.10.11
Git commit:          a17ccfcf
Built:               Thu, Oct 24, 2024 5:36 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server
Pydantic version:    2.7.1

Additional context

No response

thank you for the issue!

The issue makes a lot of sense given the wording of the docstring

An optional callable run when a task run returns a Failed state

but I think there's an argument (as suggested by @desertaxle) to be made that returning Failed as a literal value from tasks would be a shortcut for forcing a task to fail - returning State objects as literal values is a bit of an escape hatch as is.

If you don't mind @ihor-ramskyi-globallogic - can you explain a bit about the motivation for retrying tasks where you explicitly return a Failed state instead of letting an exception raise?

zzstoatzz avatar Oct 28 '24 18:10 zzstoatzz

Thank you for the reply, @zzstoatzz ! We want visualizer to mark task red when it worked without failure, but data returned is wrong. It made a lot of sense at the time it was first implemented to return Failed state with this incorrect data, possibly continuing with the flow further, using task_name.submit().result(raise_on_failure=False) and mark task failed for further investigation, not disrupting the work of flow itself. So we agreed on convention to return Failed state from try/except instead of letting the error raise.

@zzstoatzz is there an option for this bug to be fixed in foreseeable future? It interrupts our upgrade from Prefect2.

@zzstoatzz any updates on this issue? Can't upgrade to prefect 3.x.x because of this

pavlohumeniukdev avatar Nov 25 '24 14:11 pavlohumeniukdev