prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Ability to run subflows concurrently with synchronous tasks in subflows

Open jeffcarrico opened this issue 2 years ago • 1 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar request and didn't find it.
  • [X] I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

This issue is extracting an enhancement request buried in the discussion of issue #5853 which was about resolving RuntimeError: The task runner is already started! when running concurrent subflows. In the discussion for #5853 this example was presented which I have updated with the workaround from #7319.

import asyncio
from prefect import task, flow, get_run_logger
from prefect.context import FlowRunContext

@task
async def print_x(x):
    logger = get_run_logger()
    logger.info(x)
    await asyncio.sleep(2)

@flow
async def subflow(x):
    await print_x(x)

@flow
async def parent_flow():
    task_runner_type = type(FlowRunContext.get().task_runner)
    futures = [subflow.with_options(task_runner=task_runner_type())(x) for x in ["x1", "x2", "x3"]]
    await asyncio.gather(*futures)

if __name__ == "__main__":
    asyncio.run(parent_flow())

The feature request here is two parts:

  1. For subflow tasks to not have to be async
  2. For a cleaner interface (e.g. flow.map) to invoke the parallel subflows.

Describe the proposed behavior

Desired implementation is to be able to achieve concurrent subflow execution with synchronous subflow tasks by doing something like this.

import time
from prefect import task, flow, get_run_logger

@task
def print_x(x):
    logger = get_run_logger()
    logger.info(x)
    time.sleep(2)

@flow
def subflow(x):
    print_x(x)

@flow
def parent_flow():
    subflow.map(["x1", "x2", "x3"])

if __name__ == "__main__":
    parent_flow()

Example Use

No response

Additional context

Version:             2.6.4
API version:         0.8.2
Python version:      3.9.13
Git commit:          51e92dda
Built:               Thu, Oct 20, 2022 3:11 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

jeffcarrico avatar Oct 26 '22 02:10 jeffcarrico

I believe before we can support running subflows concurrently, we will need to establish the Flow.submit interface #6689 — we could in theory add a map operator that behaves the same as a gather operation but task mapping returns a future and it seems like flow mapping should too.

zanieb avatar Oct 26 '22 04:10 zanieb

Hi, I am working on migrating from 1.0. Missing the ability to run subflows concurrently with up/downstream dependency has been blocking us. I have posted a sample use case here: https://discourse.prefect.io/t/running-subflows-in-parallel-with-downstream-dependency/1903/2

PPPSDavid avatar Nov 11 '22 03:11 PPPSDavid

Running into the same problem. Love the ConcurrentTaskRunner and would love to be able to map flows that use the ConcurrentTaskRunner. Thanks for sharing the asyncio workaround here though, that's helpful.

the-matt-morris avatar Dec 28 '22 20:12 the-matt-morris

Link to my RCA: https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1409055711

rsampaths16 avatar Jan 30 '23 17:01 rsampaths16

This example now runs without error. #6689 can be used to track a nicer interface for spawning concurrent subflow runs.

zanieb avatar May 22 '23 16:05 zanieb

Thank you very much! Will follow the interface issue.

jeffcarrico avatar Jun 17 '23 02:06 jeffcarrico

will follow this interface issue as well thank you.

mahiki avatar Jul 07 '24 07:07 mahiki