prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Concurrent subflow calls causes `RuntimeError: The task runner is already started!`

Open ahuang11 opened this issue 2 years ago • 3 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Recursive sub flows results in RuntimeError: The task runner is already started!

Can workaround by re-defining a new task runner:

from prefect import flow
from prefect.context import FlowRunContext

@flow
def test_flow(x):
    task_runner_type = type(FlowRunContext.get().task_runner)
    while x > 0:
        x -= 1
        test_retry_flow.with_options(task_runner=task_runner_type())()
    return

@flow
def test_retry_flow():
    task_runner_type = type(FlowRunContext.get().task_runner)
    return test_flow.with_options(task_runner=task_runner_type())(x=2)

test_retry_flow()

Reproduction

from prefect import flow

@flow
def test_flow(x):
    while x > 0:
        x -= 1
        test_retry_flow()
    return

@flow
def test_retry_flow():
    return test_flow(x=2)

test_retry_flow()

Error

RuntimeError: The task runner is already started!

Versions

Version:             2.4.5+244.g7dfa90111
API version:         0.8.2
Python version:      3.10.6
Git commit:          7dfa9011
Built:               Tue, Oct 25, 2022 12:35 PM
OS/Arch:             darwin/arm64
Profile:             integrations-cloud
Server type:         cloud

Additional context

No response

ahuang11 avatar Oct 25 '22 20:10 ahuang11

This is a duplicate of https://github.com/PrefectHQ/prefect/issues/5853 — although this one uses our new template so I will close the other.

zanieb avatar Oct 25 '22 20:10 zanieb

@abrookins I believe I do not have the permissions to fix this on the internal backlog.

zanieb avatar Oct 25 '22 20:10 zanieb

Added to the backlog!

abrookins avatar Oct 25 '22 20:10 abrookins

This is the current workaround for this:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

khuyentran1401 avatar Nov 11 '22 17:11 khuyentran1401

Did some root cause analysis and here are my findings:

The core of the issue is the concrete creation of Flow and BaseTaskRunner objects when a function is defined instead of when it is invoked. https://github.com/PrefectHQ/prefect/blob/c7e7364561d38f778118325531b71fda35f92925/src/prefect/flows.py#L144-L147

When multiple invocations of the same @flow decorated functions are called in async the same Flow object is invoked via __call__ in async. Since Flow uses the same BaseTaskRunner object across invocations the state of BaseTaskRunner is shared across all the invocations, when ideally it should've been isolated. https://github.com/PrefectHQ/prefect/blob/328157f0b2ef05a2c3122225978b67b3afcbe4f9/src/prefect/engine.py#L367-L369

Since BaseTaskRunner state is shared across all the Flow object invocations it triggers an invalid state transition. https://github.com/PrefectHQ/prefect/blob/328157f0b2ef05a2c3122225978b67b3afcbe4f9/src/prefect/task_runners.py#L158-L159

A possible fix is we accept and assign TaskRunnerBuilder during __init__ and create new taskrunner per invocation during __call__.

rsampaths16 avatar Jan 30 '23 17:01 rsampaths16

In the short term, we could try to patch

flow_run_context.task_runner = await stack.enter_async_context( 
     flow.task_runner.start() 
 ) 

to create a copy of the task runner before starting it?

zanieb avatar Apr 16 '23 15:04 zanieb

Hey @madkinsz - I'm not sure if it needs to patched for a short-term fix ( since a workaround already exists: https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1311968282 ). Instead we can:

1. Not create the concrete instance when type is passed during init and instead instantiate during enter context

 self.task_runner = task_runner or ConcurrentTaskRunner
 flow_run_context.task_runner = await stack.enter_async_context( 
     flow.task_runner().start() if isinstance(flow.task_runner, type) else flow.task_runner.start() 
 ) 

2. Or explicitly accept a subtype of BaseTaskRunnerBuilder/Factory and use it to fetch the task_runner

  if task_runner and task_runner_builder/factory:
      raise ValueError("Flow accepts one of task_runner and task_runner_builder/factory but both are passed")

both approaches needs to mark accepting task_runner instances as deprecated and deprecate it in a major version bump.

P.S

IMO I'm not sure how reliable creating a shallow-copy of the task_runner will be when the nested objects will be copied by reference ( it could cause race as states/internal variables will be shared between copies, which would in turn create a host of new problems ).

rsampaths16 avatar Apr 16 '23 16:04 rsampaths16

I don't want to introduce a new concept (i.e. a builder/factory type) to users in order to resolve this issue. We can add a new concept behind the scenes (i.e. a TaskRunnerBackend) if we must, but it seems much easier to add a copy method to the task runner class that just preserves the settings and no state.

zanieb avatar Apr 16 '23 16:04 zanieb

If we use a copy() it would need to be explicitly called out that we make a copy of task_runner in the docs so clients can override __copy__() / __deepcopy__() if needed.

P.S 1: I'm assuming you mean the you'll use the standard copy.copy() to achieve this, if not then it's fine to introduce a new method.

P.S 2: Why not leave the type as it is during init and create aninstance during enter_async_context?

rsampaths16 avatar Apr 16 '23 16:04 rsampaths16

@rsampaths16 curious for your thoughts on #9342

Are you implementing your own task runners? We can just implement it on the base class and child classes can implement it if they want good behavior.

zanieb avatar Apr 26 '23 22:04 zanieb

Hey - @madkinsz I've added few review comments for #9342 The approach seems fine as it isn't done using the copy.

Are you implementing your own task runners? We can just implement it on the base class and child classes can implement it if they want good behavior

I haven't implemented any task runners as flow parallelism wasn't a blocker yet; The issue was noticed while doing POCs.

rsampaths16 avatar Apr 27 '23 12:04 rsampaths16