prefect3.0 not running subflows in parallel
hello community ! I'm using prefect==3.0.2 and there seem to be an issue again with running subflows concurrently
this code triggers 5 subflows that are being run sequentially:
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def subflow_1():
print("Subflow 1 started!")
await asyncio.sleep(1)
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def main_flow():
for _ in range(5):
await subflow_1()
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
even when using workpool, workers and ad-hoc deployment, I still cannot have my subflows running in parallel:
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=3),
log_prints=True,
)
async def master(names: list[str]):
_r = map(main,names)
r = await asyncio.gather(*_r)
print('master > ',r)
return r
@task(
log_prints=True
)
async def doit(name):
print('task <',name)
await asyncio.sleep(5)
return f'hello {name} !!'
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=3),
log_prints=True,
)
async def main(name):
print('main <',name)
_r = doit.submit(name)
res = _r.result()
print('main >',res)
return res
if __name__ == "__main__":
'''
CREATE WORKPOOL + WORKERS + DEPLOY
'''
# prefect work-pool create --overwrite --type process wp1 --set-as-default
# prefect worker start --pool wp1 --limit 5 --type process ( << run several of these)
# prefect deploy /work/path/test_1.py:master --pool wp1 --work-queue default --concurrency-limit 10 --name dep1
async def run_deployment(dep_id,names):
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(
deployment_id=dep_id,
parameters=dict(names=names),
job_variables={'names':names}
)
print('flow run',flow_run)
dp_id='c8cb8a2c-0427-4474-a9fc-583bcc78bf98'
asyncio.run(run_deployment(dp_id,['charlie', 'bob', 'alice']))
am I doing anything wrong ?
Originally posted by @captnced in https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-2358668161
update : if I make use of a wrapper task to call the subflow then the subflows are being run in parallel.
import asyncio
from prefect import deploy, flow, task
from datetime import datetime, timedelta, timezone
from prefect.client.schemas.objects import FlowRun
from prefect.client.orchestration import get_client
from prefect.states import Scheduled
from prefect.task_runners import ThreadPoolTaskRunner
@task(
log_prints=True,
)
async def doit_wrapper_task(name):
return await main(name) # TASK > SUB TASK
@task(
log_prints=True
)
async def doit(name):
print('task <',name)
await asyncio.sleep(5)
return f'hello {name} !!'
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=20),
log_prints=True,
)
async def main(name):
print('main <',name)
_r = doit.submit(name)
res = _r.result()
print('main >',res)
return res
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=20),
log_prints=True,
)
async def master(names: list[str]):
_r = [doit_wrapper_task.submit(name) for name in names]
r = [ __r.result() for __r in _r]
print('master > ',r)
return r
Thanks for the issue @captnced!
It looks like you need an asyncio.gather call in your first example:
import asyncio
from prefect import flow
@flow
async def subflow_1():
print("Subflow 1 started!")
await asyncio.sleep(1)
@flow
async def main_flow():
await asyncio.gather(*[subflow_1() for _ in range(5)])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
However, even with asyncio.gather, the starting of those flow runs appears staggered, suggesting that something is blocking the event loop in the flow run engine. I suspect the example in your comment works because each child flow run happens in a separate thread, which works around this issue.
We can investigate what's blocking the event loop in the flow run engine and post updates here. If you discover anything new, please leave a comment!
@desertaxle By the way, may I ask if Prefect, especially version 3.0, supports parallel execution of subflows? I haven’t found any documentation or examples mentioning this feature. If you could provide any clarification, I would greatly appreciate it.
@tyong920 you should be able to run child flows concurrently/in parallel as you would any other Python function. This issue shows some snags when using asyncio for concurrency due to some blocking behavior in the engine. Still, you should be able to use threads or processes to achieve concurrency or parallelism for your child flows.
@tyong920 you should be able to run child flows concurrently/in parallel as you would any other Python function. This issue shows some snags when using
asynciofor concurrency due to some blocking behavior in the engine. Still, you should be able to use threads or processes to achieve concurrency or parallelism for your child flows.
Thank you very much for your response.
I’d like to clarify my understanding. As you mentioned, if I treat a subflow like a regular Python function—for example, using concurrent.futures.ThreadPoolExecutor() as follows:
with concurrent.futures.ThreadPoolExecutor() as executor: executor.submit(sub_flow, ...)
I believe this should work. However, is this the recommended approach for running subflows in parallel in Prefect v3? It seems that in this case, the task_runner=ThreadPoolTaskRunner() in the main flow isn’t being fully utilized.
@bwooldridge-alphalayer do you have a code example that you could share where you see child flows sharing the same thread when used with the ThreadPoolTaskRunner? I'm very surprised by that and would like to find out why that's happening.
We'll tackle the root cause of this issue when working on https://github.com/PrefectHQ/prefect/issues/15008. We plan to do so after we complete our current settings milestone.
@bwooldridge-alphalayer do you have a code example that you could share where you see child flows sharing the same thread when used with the
ThreadPoolTaskRunner? I'm very surprised by that and would like to find out why that's happening.We'll tackle the root cause of this issue when working on #15008. We plan to do so after we complete our current settings milestone.
Hi @desertaxle, thanks for the quick reply. I was able to get my subflows working in parallel using the task wrapper approach. I'll delete my previous message to avoid future confusion.
@desertaxle We're seeing the same issue where the flow setup itself is synchronous even when running an async flow. In Prefect 2, I believe the analogous flow setup used an async Prefect client so all network calls to the Prefect server could yield to other tasks on the event loop. For flows that launch many subflows concurrently, this ends up being a significant performance penalty.
While there are workarounds like executing subflows on their own threads, there are drawbacks to these workarounds that make them unappealing for our use cases. Will #15008 address making the flow engine itself asynchronous when running async flows? Thanks!
hi @brett-patterson-ent - that issue you linked is more about public interfaces being sync_compatible and fixing this than it is about as AsyncFlowEngine - if you look at task_engine.py that should give a sense of what we would need to do to flow_engine.py, which is likely something we will do (but im not sure an issue exists for that yet).