prefect
prefect copied to clipboard
Concurrent subflow calls causes `RuntimeError: The task runner is already started!`
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
Recursive sub flows results in RuntimeError: The task runner is already started!
Can workaround by re-defining a new task runner:
from prefect import flow
from prefect.context import FlowRunContext
@flow
def test_flow(x):
task_runner_type = type(FlowRunContext.get().task_runner)
while x > 0:
x -= 1
test_retry_flow.with_options(task_runner=task_runner_type())()
return
@flow
def test_retry_flow():
task_runner_type = type(FlowRunContext.get().task_runner)
return test_flow.with_options(task_runner=task_runner_type())(x=2)
test_retry_flow()
Reproduction
from prefect import flow
@flow
def test_flow(x):
while x > 0:
x -= 1
test_retry_flow()
return
@flow
def test_retry_flow():
return test_flow(x=2)
test_retry_flow()
Error
RuntimeError: The task runner is already started!
Versions
Version: 2.4.5+244.g7dfa90111
API version: 0.8.2
Python version: 3.10.6
Git commit: 7dfa9011
Built: Tue, Oct 25, 2022 12:35 PM
OS/Arch: darwin/arm64
Profile: integrations-cloud
Server type: cloud
Additional context
No response
This is a duplicate of https://github.com/PrefectHQ/prefect/issues/5853 — although this one uses our new template so I will close the other.
@abrookins I believe I do not have the permissions to fix this on the internal backlog.
Added to the backlog!
This is the current workaround for this:
import asyncio
from prefect import flow, task, get_run_logger
@task
async def my_task(n: int):
get_run_logger().info(f"Task {n} started!")
await asyncio.sleep(1)
async def build_subflow(n):
@flow(name=f"subflow:{n}")
async def subflow(x):
await my_task(x)
await subflow(n)
@flow
async def main_flow():
await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Did some root cause analysis and here are my findings:
The core of the issue is the concrete creation of Flow
and BaseTaskRunner
objects when a function is defined instead of when it is invoked.
https://github.com/PrefectHQ/prefect/blob/c7e7364561d38f778118325531b71fda35f92925/src/prefect/flows.py#L144-L147
When multiple invocations of the same @flow
decorated functions
are called in async
the same Flow
object is invoked via __call__
in async
. Since Flow
uses the same BaseTaskRunner
object across invocations the state of BaseTaskRunner
is shared across all the invocations, when ideally it should've been isolated.
https://github.com/PrefectHQ/prefect/blob/328157f0b2ef05a2c3122225978b67b3afcbe4f9/src/prefect/engine.py#L367-L369
Since BaseTaskRunner
state is shared across all the Flow
object invocations it triggers an invalid state transition.
https://github.com/PrefectHQ/prefect/blob/328157f0b2ef05a2c3122225978b67b3afcbe4f9/src/prefect/task_runners.py#L158-L159
A possible fix is we accept and assign TaskRunnerBuilder
during __init__
and create new taskrunner
per invocation during __call__
.
In the short term, we could try to patch
flow_run_context.task_runner = await stack.enter_async_context(
flow.task_runner.start()
)
to create a copy of the task runner before starting it?
Hey @madkinsz - I'm not sure if it needs to patched for a short-term fix ( since a workaround already exists: https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1311968282 ). Instead we can:
1. Not create the concrete instance when type is passed during init
and instead instantiate during enter context
self.task_runner = task_runner or ConcurrentTaskRunner
flow_run_context.task_runner = await stack.enter_async_context(
flow.task_runner().start() if isinstance(flow.task_runner, type) else flow.task_runner.start()
)
2. Or explicitly accept a subtype of BaseTaskRunnerBuilder/Factory
and use it to fetch the task_runner
if task_runner and task_runner_builder/factory:
raise ValueError("Flow accepts one of task_runner and task_runner_builder/factory but both are passed")
both approaches needs to mark accepting task_runner
instances as deprecated and deprecate it in a major version bump.
P.S
IMO I'm not sure how reliable creating a shallow-copy
of the task_runner
will be when the nested objects will be copied by reference ( it could cause race as states/internal variables will be shared between copies, which would in turn create a host of new problems ).
I don't want to introduce a new concept (i.e. a builder/factory type) to users in order to resolve this issue. We can add a new concept behind the scenes (i.e. a TaskRunnerBackend
) if we must, but it seems much easier to add a copy
method to the task runner class that just preserves the settings and no state.
If we use a copy()
it would need to be explicitly called out that we make a copy of task_runner
in the docs so clients can override __copy__()
/ __deepcopy__()
if needed.
P.S 1: I'm assuming you mean the you'll use the standard copy.copy()
to achieve this, if not then it's fine to introduce a new method.
P.S 2: Why not leave the type
as it is during init
and create aninstance during enter_async_context
?
@rsampaths16 curious for your thoughts on #9342
Are you implementing your own task runners? We can just implement it on the base class and child classes can implement it if they want good behavior.
Hey - @madkinsz I've added few review comments for #9342 The approach seems fine as it isn't done using the copy
.
Are you implementing your own task runners? We can just implement it on the base class and child classes can implement it if they want good behavior
I haven't implemented any task runners as flow parallelism wasn't a blocker yet; The issue was noticed while doing POCs.