prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Improve flow runner to enable more task parallelism

Open xuevin opened this issue 5 years ago • 8 comments

Description

Flatten has edge cases which sometimes causes tasks to run sequentially. I have identified 2 cases below which demonstrate the unusual behavior. In the test case, I flatten the output of a mapped seed task and execute 3 dependent sleep tasks which are intended to run in parallel. The implementation of the DAG has an influence on whether the job executes in parallel or not

Expected Behavior

fast_flow demonstrates the expected behavior. Upon launch, all three print statements show up immediately. slow_flow adds one additional step by storing flatten into an array, and retrieving it by index. This has the same DAG as fast_flow but this ends up executing sequentially. slow_flow_2 is identical to fast_flow but removes the map task. It is odd that downstream tasks are required for parallel execution.

image

Reproduction

import time
from prefect import task, Flow, flatten, unmapped
from prefect.engine.executors import LocalDaskExecutor


@task
def seed(i):
    return [i]


@task
def sleep(i, sleep_time):
    print(f'Sleep {sleep_time}')
    time.sleep(sleep_time)

    print(f'Finished {sleep_time}')

    return [i]


@task
def task2(i):
    print(f'Task2 {i}')
    print(f'Finished {i}')
    return [i]


with Flow("slow_flow") as slow_flow:
    start = ['A']
    inputs = flatten(seed.map(start))
    all_paths = []
    for i in (10, 11, 12):
        foo = [flatten(sleep.map(inputs, unmapped(i)))]
        all_paths.append(foo[0])
    sleep_out = flatten(all_paths)
    task2.map(sleep_out)

with Flow("fast_flow") as fast_flow:
    start = ['A']
    inputs = flatten(seed.map(start))

    all_paths = []
    for i in (10, 11, 12):
        all_paths.append(sleep.map(inputs, unmapped(i)))
    sleep_out = flatten(all_paths)
    task2.map(sleep_out)

with Flow("slow_flow_2") as slow_flow_2:
    start = ['A']
    inputs = flatten(seed.map(start))

    all_paths = []
    for i in (10,11,12):
        all_paths.append(sleep.map(inputs, unmapped(i)))


fast_flow.run(executor=LocalDaskExecutor(scheduler='threads'))
#slow_flow.run(executor=LocalDaskExecutor(scheduler='threads'))
#slow_flow_2.run(executor=LocalDaskExecutor(scheduler='threads'))

Environment

Prefect - 0.13.7 python 3.8.2

xuevin avatar Sep 24 '20 03:09 xuevin

Hi @xuevin, thanks for the reproducible example. I've marked this as an enhancement rather than a bug, as I don't think it breaks any of our existing guarantees.

The way prefect is currently written, we guarantee that tasks in a *single map task can run in parallel, but multiple unrelated tasks (or map tasks as above) may or may not depending on the internals to the FlowRunner implementation. That said, there's no reason why we couldn't enable parallelism across the three different map tasks above.

For now, if you want to ensure that tasks will run in parallel, you (unfortunately) either need to write them explicitly as a single map task pipeline, or check that the quirks of the existing flow runner let these tasks run in parallel.

jcrist avatar Sep 24 '20 13:09 jcrist

One approach that I have used to achieve parallelism is to use this implementation of flatten. This works well locally, but I think it will occur a lot of overhead when moving toward distributed systems.

@task
def flatten(list_to_reduce):
    return [item for sublist in list_to_reduce for item in sublist]

However, I still see that it requires the additional dependent tasks that follow it to properly execute in parallel. ie. slow_flow_2 is still slow.

xuevin avatar Sep 24 '20 14:09 xuevin

Yeah, that will gather all results locally before mapping, which will add a bunch of overhead if the results are large-ish (if they're not it's probably fine).

The proper fix would requires some changes to the FlowRunner to be better expose parallelism opportunities between tasks rather than within a single mapped task.

jcrist avatar Sep 24 '20 14:09 jcrist

I think I am seeing the same issue. This code runs sequentially

from time import sleep
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

@task()
def add_ten(x):
    print(f'starting {x}')
    sleep(10)
    print(f'ending {x}')
    return x * 2

with Flow('simple map', executor=LocalDaskExecutor(num_workers=25)) as flow:
    t = add_ten.map([1, 2, 3])
    s = add_ten.map([10, 20, 30])

if __name__ == '__main__':
    from prefect.run_configs import LocalRun
    flow.run_config = LocalRun()
    flow.run()

but this runs in parallel

from time import sleep

from prefect import Flow, task
from prefect.executors import LocalDaskExecutor


@task()
def add_ten(x):
    print(f'starting {x}')
    sleep(10)
    print(f'ending {x}')
    return x * 2


with Flow('simple map', executor=LocalDaskExecutor(num_workers=25)) as flow:
    t = add_ten(1)
    t1 = add_ten(2)
    t2 = add_ten(3)
    t3 = add_ten(10)
    t4 = add_ten(20)
    t5 = add_ten(30)

if __name__ == '__main__':
    from prefect.run_configs import LocalRun

    flow.run_config = LocalRun()
    flow.run()

They should be equivalent. This is a big issue for us. Are there any workarounds?

tkram01 avatar May 23 '21 21:05 tkram01

@xuevin @jcrist The following error occurs when switching scheduler to processes fast_flow.run(executor=LocalDaskExecutor(scheduler='processes')) ----log----

[2021-10-11 14:06:45+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'slow_flow_2'
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
Traceback (most recent call last):
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
    num_workers, initializer=_multiprocessing_pool_initializer
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
Traceback (most recent call last):
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
    num_workers, initializer=_multiprocessing_pool_initializer
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
Traceback (most recent call last):
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
    num_workers, initializer=_multiprocessing_pool_initializer
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] ERROR - prefect.FlowRunner | Unexpected error: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
Traceback (most recent call last):
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/Users/zhangqiang/testproject/prefecttest/venv/lib/python3.7/site-packages/prefect/executors/dask.py", line 548, in start
    num_workers, initializer=_multiprocessing_pool_initializer
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
[2021-10-11 14:06:48+0800] ERROR - prefect.slow_flow_2 | Unexpected error occured in FlowRunner: RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')
[2021-10-11 14:06:48+0800] INFO - prefect.TaskRunner | Task 'seed': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed[0]': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'seed[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Starting task run...
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep': Finished task run for task with final state: 'Mapped'
[2021-10-11 14:06:49+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 10
Finished 10
[2021-10-11 14:06:59+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:06:59+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 12
Finished 12
[2021-10-11 14:07:11+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
[2021-10-11 14:07:12+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Starting task run...
Sleep 11
Finished 11
[2021-10-11 14:07:23+0800] INFO - prefect.TaskRunner | Task 'sleep[0]': Finished task run for task with final state: 'Success'
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
  len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
  len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
  len(cache))
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 4 leaked semaphores to clean up at shutdown
  len(cache))
[2021-10-11 14:07:23+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

zqabc avatar Oct 11 '21 06:10 zqabc

One reminder is that when you switch the executor to DaskExecutor, you can run in parallel. Is it just the reason for the actuator LocalDaskExecutor? @jcrist

zqabc avatar Oct 11 '21 06:10 zqabc

Please don't ping our team directly. We triage handling of these issues internally.

As the warning displays, flow.run() calls when using the DaskExecutor must be guarded by a __main__ block. This is a requirement for how they package scripts to send to workers.

if __name__ == '__main__':
    flow.run(...)

zanieb avatar Oct 11 '21 15:10 zanieb

Okay, got it. I ignored this.

zqabc avatar Oct 13 '21 06:10 zqabc

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] avatar Nov 20 '22 02:11 github-actions[bot]

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

github-actions[bot] avatar Dec 05 '22 02:12 github-actions[bot]