prefect
prefect copied to clipboard
Ability to run subflows concurrently with synchronous tasks in subflows
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
Prefect Version
2.x
Describe the current behavior
This issue is extracting an enhancement request buried in the discussion of issue #5853 which was about resolving RuntimeError: The task runner is already started!
when running concurrent subflows. In the discussion for #5853 this example was presented which I have updated with the workaround from #7319.
import asyncio
from prefect import task, flow, get_run_logger
from prefect.context import FlowRunContext
@task
async def print_x(x):
logger = get_run_logger()
logger.info(x)
await asyncio.sleep(2)
@flow
async def subflow(x):
await print_x(x)
@flow
async def parent_flow():
task_runner_type = type(FlowRunContext.get().task_runner)
futures = [subflow.with_options(task_runner=task_runner_type())(x) for x in ["x1", "x2", "x3"]]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(parent_flow())
The feature request here is two parts:
- For subflow tasks to not have to be async
- For a cleaner interface (e.g.
flow.map
) to invoke the parallel subflows.
Describe the proposed behavior
Desired implementation is to be able to achieve concurrent subflow execution with synchronous subflow tasks by doing something like this.
import time
from prefect import task, flow, get_run_logger
@task
def print_x(x):
logger = get_run_logger()
logger.info(x)
time.sleep(2)
@flow
def subflow(x):
print_x(x)
@flow
def parent_flow():
subflow.map(["x1", "x2", "x3"])
if __name__ == "__main__":
parent_flow()
Example Use
No response
Additional context
Version: 2.6.4
API version: 0.8.2
Python version: 3.9.13
Git commit: 51e92dda
Built: Thu, Oct 20, 2022 3:11 PM
OS/Arch: linux/x86_64
Profile: default
Server type: hosted
I believe before we can support running subflows concurrently, we will need to establish the Flow.submit
interface #6689 — we could in theory add a map
operator that behaves the same as a gather
operation but task mapping returns a future and it seems like flow mapping should too.
Hi, I am working on migrating from 1.0. Missing the ability to run subflows concurrently with up/downstream dependency has been blocking us. I have posted a sample use case here: https://discourse.prefect.io/t/running-subflows-in-parallel-with-downstream-dependency/1903/2
Running into the same problem. Love the ConcurrentTaskRunner
and would love to be able to map flows that use the ConcurrentTaskRunner
. Thanks for sharing the asyncio workaround here though, that's helpful.
Link to my RCA: https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1409055711
This example now runs without error. #6689 can be used to track a nicer interface for spawning concurrent subflow runs.
Thank you very much! Will follow the interface issue.
will follow this interface issue as well thank you.