Improve flow runner to enable more task parallelism
Description
Flatten has edge cases which sometimes causes tasks to run sequentially. I have identified 2 cases below which demonstrate the unusual behavior. In the test case, I flatten the output of a mapped seed task and execute 3 dependent sleep tasks which are intended to run in parallel. The implementation of the DAG has an influence on whether the job executes in parallel or not
Expected Behavior
fast_flow demonstrates the expected behavior. Upon launch, all three print statements show up immediately.
slow_flow adds one additional step by storing flatten into an array, and retrieving it by index. This has the same DAG as fast_flow but this ends up executing sequentially.
slow_flow_2 is identical to fast_flow but removes the map task. It is odd that downstream tasks are required for parallel execution.

Reproduction
import time
from prefect import task, Flow, flatten, unmapped
from prefect.engine.executors import LocalDaskExecutor
@task
def seed(i):
return [i]
@task
def sleep(i, sleep_time):
print(f'Sleep {sleep_time}')
time.sleep(sleep_time)
print(f'Finished {sleep_time}')
return [i]
@task
def task2(i):
print(f'Task2 {i}')
print(f'Finished {i}')
return [i]
with Flow("slow_flow") as slow_flow:
start = ['A']
inputs = flatten(seed.map(start))
all_paths = []
for i in (10, 11, 12):
foo = [flatten(sleep.map(inputs, unmapped(i)))]
all_paths.append(foo[0])
sleep_out = flatten(all_paths)
task2.map(sleep_out)
with Flow("fast_flow") as fast_flow:
start = ['A']
inputs = flatten(seed.map(start))
all_paths = []
for i in (10, 11, 12):
all_paths.append(sleep.map(inputs, unmapped(i)))
sleep_out = flatten(all_paths)
task2.map(sleep_out)
with Flow("slow_flow_2") as slow_flow_2:
start = ['A']
inputs = flatten(seed.map(start))
all_paths = []
for i in (10,11,12):
all_paths.append(sleep.map(inputs, unmapped(i)))
fast_flow.run(executor=LocalDaskExecutor(scheduler='threads'))
#slow_flow.run(executor=LocalDaskExecutor(scheduler='threads'))
#slow_flow_2.run(executor=LocalDaskExecutor(scheduler='threads'))
Environment
Prefect - 0.13.7 python 3.8.2
Hi @xuevin, thanks for the reproducible example. I've marked this as an enhancement rather than a bug, as I don't think it breaks any of our existing guarantees.
The way prefect is currently written, we guarantee that tasks in a *single map task can run in parallel, but multiple unrelated tasks (or map tasks as above) may or may not depending on the internals to the FlowRunner implementation. That said, there's no reason why we couldn't enable parallelism across the three different map tasks above.
For now, if you want to ensure that tasks will run in parallel, you (unfortunately) either need to write them explicitly as a single map task pipeline, or check that the quirks of the existing flow runner let these tasks run in parallel.
One approach that I have used to achieve parallelism is to use this implementation of flatten. This works well locally, but I think it will occur a lot of overhead when moving toward distributed systems.
@task
def flatten(list_to_reduce):
return [item for sublist in list_to_reduce for item in sublist]
However, I still see that it requires the additional dependent tasks that follow it to properly execute in parallel. ie. slow_flow_2 is still slow.
Yeah, that will gather all results locally before mapping, which will add a bunch of overhead if the results are large-ish (if they're not it's probably fine).
The proper fix would requires some changes to the FlowRunner to be better expose parallelism opportunities between tasks rather than within a single mapped task.
I think I am seeing the same issue. This code runs sequentially
from time import sleep
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task()
def add_ten(x):
print(f'starting {x}')
sleep(10)
print(f'ending {x}')
return x * 2
with Flow('simple map', executor=LocalDaskExecutor(num_workers=25)) as flow:
t = add_ten.map([1, 2, 3])
s = add_ten.map([10, 20, 30])
if __name__ == '__main__':
from prefect.run_configs import LocalRun
flow.run_config = LocalRun()
flow.run()
but this runs in parallel
from time import sleep
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task()
def add_ten(x):
print(f'starting {x}')
sleep(10)
print(f'ending {x}')
return x * 2
with Flow('simple map', executor=LocalDaskExecutor(num_workers=25)) as flow:
t = add_ten(1)
t1 = add_ten(2)
t2 = add_ten(3)
t3 = add_ten(10)
t4 = add_ten(20)
t5 = add_ten(30)
if __name__ == '__main__':
from prefect.run_configs import LocalRun
flow.run_config = LocalRun()
flow.run()
They should be equivalent. This is a big issue for us. Are there any workarounds?
@xuevin @jcrist
The following error occurs when switching scheduler to processes
fast_flow.run(executor=LocalDaskExecutor(scheduler='processes'))
----log----
[2021-10-11 14:06:45+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
Traceback (most recent call last):
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
num_workers, initializer=_multiprocessing_pool_initializer
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
Traceback (most recent call last):
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
num_workers, initializer=_multiprocessing_pool_initializer
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
Traceback (most recent call last):
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
num_workers, initializer=_multiprocessing_pool_initializer
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
Traceback (most recent call last):
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
num_workers, initializer=_multiprocessing_pool_initializer
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] INFO - prefect.TaskRunner | Task 'seed': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed[0]': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 10
Finished 10
[2021-10-11 14:06:59+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:06:59+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 12
Finished 12
[2021-10-11 14:07:11+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:07:12+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 11
Finished 11
[2021-10-11 14:07:23+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
len(cache))
[2021-10-11 14:07:23+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
One reminder is that when you switch the executor to DaskExecutor, you can run in parallel.
Is it just the reason for the actuator LocalDaskExecutor? @jcrist
Please don't ping our team directly. We triage handling of these issues internally.
As the warning displays, flow.run() calls when using the DaskExecutor must be guarded by a __main__ block. This is a requirement for how they package scripts to send to workers.
if __name__ == '__main__':
flow.run(...)
Okay, got it. I ignored this.
This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.
This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.