prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Task hangs due to multiprocessing.Pool.join()

Open mianos opened this issue 1 year ago • 3 comments

Bug summary

This is just a restatement of: https://github.com/PrefectHQ/prefect/issues/13584 with more info and the motivation to follow it up. If I can fix this, I'll be recommending my client go with prefect (my own firm uses it for automation)

I think this may be due to python forking and then threads from the parents are waited for in the forked process. Although this is meant to be an anti pattern, it happens a lot to me in complex workflows. In particular, if I try and create my own version of some way to timeout a task due to timeout_seconds not working, in a subprocess or thread, when the child process exits, it gets blocked waiting on threads that are not present. I have some flows that seem to be doing some forking on account of the underlying python calls. This is out of my control.

The following example is from the original bug. This example locks up on waiter.acquire() (as shown in the following stack traces).

from prefect import flow, task
import time

import multiprocessing as mp

def _f(i):
    print(f"a-{i:02d}")
    time.sleep(0.1)

@task(name="small_task", log_prints=True)
def small_task(worker_count: int = 4, count: int = 10):
    responses = []
    with mp.Pool(worker_count) as pool:
        for f in range(count):
            responses.append(pool.apply_async(_f, args=(f,)))
        print(f"start: pool.close()")
        pool.close()
        print(f"end: pool.close()")
        print(f"start: pool.join()")
        pool.join()
        print(f"end: pool.join()")
    return responses

@flow(name="test")
def flow_test(worker_count: int = 4, count: int = 10):
    return small_task(worker_count=worker_count, count=count)

if __name__ == "__main__":
    flow_test(worker_count=4, count=10)

This example is 100% repeatable:

Stack trace and run in the later box

Version info (prefect version output)

prefect version
Version:             2.20.1
API version:         0.8.4
Python version:      3.11.2
Git commit:          f6bebfca
Built:               Fri, Aug 9, 2024 10:35 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.40.1

Additional context

This is on linux:

Linux old9 6.1.0-23-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.99-1 (2024-07-15) x86_64 GNU/Linux

cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)"
NAME="Debian GNU/Linux"
VERSION_ID="12"
VERSION="12 (bookworm)"
VERSION_CODENAME=bookworm
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"

Run

python pt2.py
06:12:09.556 | INFO    | prefect.engine - Created flow run 'competent-guan' for flow 'test'
06:12:09.606 | INFO    | Flow run 'competent-guan' - Created task run 'small_task-0' for task 'small_task'
06:12:09.606 | INFO    | Flow run 'competent-guan' - Executing 'small_task-0' immediately...
06:12:09.650 | INFO    | Task run 'small_task-0' - start: pool.close()
06:12:09.651 | INFO    | Task run 'small_task-0' - end: pool.close()
06:12:09.652 | INFO    | Task run 'small_task-0' - start: pool.join()
06:12:09.653 | INFO    | Task run 'small_task-0' - a-00
06:12:09.653 | INFO    | Task run 'small_task-0' - a-01
06:12:09.653 | INFO    | Task run 'small_task-0' - a-02
06:12:09.653 | INFO    | Task run 'small_task-0' - a-03
06:12:09.756 | INFO    | Task run 'small_task-0' - a-04
06:12:09.756 | INFO    | Task run 'small_task-0' - a-07
06:12:09.756 | INFO    | Task run 'small_task-0' - a-06
06:12:09.756 | INFO    | Task run 'small_task-0' - a-05
06:12:09.858 | INFO    | Task run 'small_task-0' - a-08
06:12:09.858 | INFO    | Task run 'small_task-0' - a-09

Locked up here forever now . So I have to press ^C to stop it. If this is in a deployment running on agent it will just block forever:

Stack trace

^C06:13:17.890 | ERROR   | Task run 'small_task-0' - Crash detected! Execution was cancelled by the runtime environment.
06:13:17.930 | ERROR   | Flow run 'competent-guan' - Crash detected! Execution was aborted by an interrupt signal.
  + Exception Group Traceback (most recent call last):
  |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
  |     cb_suppress = await cb(*exc_details)
  |                   ^^^^^^^^^^^^^^^^^^^^^^
  |   File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
  |     await self.gen.athrow(typ, value, traceback)
  |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 182, in start
  |     async with AsyncExitStack() as exit_stack:
  |   File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
  |     raise exc_details[1]
  |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
  |     cb_suppress = await cb(*exc_details)
  |                   ^^^^^^^^^^^^^^^^^^^^^^
  |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
  |     raise BaseExceptionGroup(
  | BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
    |     flow_test(worker_count=4, count=10)
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
    |     return enter_flow_run_engine_from_flow_call(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
    |     retval = from_sync.wait_for_call_in_loop_thread(
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    |     return call.result()
    |            ^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    |     return self.future.result(timeout=timeout)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    |     return self.__get_result()
    |            ^^^^^^^^^^^^^^^^^^^
    |   File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    |     raise self._exception
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    |     result = await coro
    |              ^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    |     return await fn(*args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
    |     state = await begin_flow_run(
    |             ^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
    |     async with AsyncExitStack() as stack:
    |   File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
    |     raise exc_details[1]
    |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |                   ^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
    |     await self.gen.athrow(typ, value, traceback)
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
    |     with collapse_excgroups():
    |   File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    |     self.gen.throw(typ, value, traceback)
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
    |     raise exc
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
    |     yield self
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
    |     terminal_or_paused_state = await orchestrate_flow_run(
    |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
    |     result = await flow_call.aresult()
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    |     return await asyncio.wrap_future(self.future)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
    |     return small_task(worker_count=worker_count, count=count)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
    |     return enter_task_run_engine(
    |            ^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
    |     return from_sync.wait_for_call_in_loop_thread(begin_run)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
    |     waiter.wait()
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
    |     self._handle_waiting_callbacks()
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
    |     callback: Call = self._queue.get()
    |                      ^^^^^^^^^^^^^^^^^
    |   File "/usr/lib/python3.11/queue.py", line 171, in get
    |     self.not_empty.wait()
    |   File "/usr/lib/python3.11/threading.py", line 320, in wait
    |     waiter.acquire()
    | KeyboardInterrupt
    +------------------------------------

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 745, in collapse_excgroups
  |     yield
  |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2254, in report_flow_run_crashes
  |     yield
  |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
  |     cb_suppress = await cb(*exc_details)
  |                   ^^^^^^^^^^^^^^^^^^^^^^
  |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
  |     raise BaseExceptionGroup(
  | BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Exception Group Traceback (most recent call last):
    |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |                   ^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
    |     await self.gen.athrow(typ, value, traceback)
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 182, in start
    |     async with AsyncExitStack() as exit_stack:
    |   File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
    |     raise exc_details[1]
    |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |                   ^^^^^^^^^^^^^^^^^^^^^^
    |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
    |     raise BaseExceptionGroup(
    | BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
      |     flow_test(worker_count=4, count=10)
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
      |     return enter_flow_run_engine_from_flow_call(
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
      |     retval = from_sync.wait_for_call_in_loop_thread(
      |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
      |     return call.result()
      |            ^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
      |     return self.future.result(timeout=timeout)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
      |     return self.__get_result()
      |            ^^^^^^^^^^^^^^^^^^^
      |   File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
      |     raise self._exception
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
      |     result = await coro
      |              ^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
      |     return await fn(*args, **kwargs)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
      |     state = await begin_flow_run(
      |             ^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
      |     async with AsyncExitStack() as stack:
      |   File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
      |     raise exc_details[1]
      |   File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
      |     cb_suppress = await cb(*exc_details)
      |                   ^^^^^^^^^^^^^^^^^^^^^^
      |   File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
      |     await self.gen.athrow(typ, value, traceback)
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
      |     with collapse_excgroups():
      |   File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
      |     self.gen.throw(typ, value, traceback)
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
      |     raise exc
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
      |     yield self
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
      |     terminal_or_paused_state = await orchestrate_flow_run(
      |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
      |     result = await flow_call.aresult()
      |              ^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
      |     return await asyncio.wrap_future(self.future)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
      |     result = self.fn(*self.args, **self.kwargs)
      |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
      |     return small_task(worker_count=worker_count, count=count)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
      |     return enter_task_run_engine(
      |            ^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
      |     return from_sync.wait_for_call_in_loop_thread(begin_run)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
      |     waiter.wait()
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
      |     self._handle_waiting_callbacks()
      |   File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
      |     callback: Call = self._queue.get()
      |                      ^^^^^^^^^^^^^^^^^
      |   File "/usr/lib/python3.11/queue.py", line 171, in get
      |     self.not_empty.wait()
      |   File "/usr/lib/python3.11/threading.py", line 320, in wait
      |     waiter.acquire()
      | KeyboardInterrupt
      +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
    flow_test(worker_count=4, count=10)
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
    async with AsyncExitStack() as stack:
  File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
    with collapse_excgroups():
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
    raise exc
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
    yield self
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
    return small_task(worker_count=worker_count, count=count)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
    self._handle_waiting_callbacks()
  File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
    callback: Call = self._queue.get()
                     ^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/queue.py", line 171, in get
    self.not_empty.wait()
  File "/usr/lib/python3.11/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

Anything I am missing?

(if someone wants to ssh into my machine, this is just a new lxd container so you are welcome)

mianos avatar Aug 13 '24 06:08 mianos

Thanks for the issue @mianos!

I was able to reproduce this issue on a Linux machine, but it works as expected on a Mac. I am also seeing this issue in the most recent 2.x version as well as the most recent 3.0 RC release. I think there must be a difference in process management causing this issue, but I'll have to investigate further to find a root cause. If you find any new info, let me know!

desertaxle avatar Aug 13 '24 14:08 desertaxle

Is there a way I can cancel all the prefect managed threads when my forked process exits as a workaround? It seems a hack, but something like "I am not using any threads in this process so just exit without waiting"?

mianos avatar Aug 19 '24 04:08 mianos

I can confirm the problem exists. what is different on Mac is that process pool uses spawn method to start processes while Linux uses fork. @mianos maybe you can force spawn for your process pool and see if that helps.

rudolfix avatar Aug 22 '24 15:08 rudolfix

My trouble is I am using a lot of third party libraries. I assume one of them (paramiko, docker), is doing a fork. I don't have any mention of fork or multiprocessing in my flow source code. Otherwise I'd replace it with spawn. The above example is not my code but shows the exact same back trace as mine.

mianos avatar Sep 02 '24 05:09 mianos

I believe this was fixed in 3.2.14 - let us know if that's not the case though.

cicdw avatar Mar 24 '25 20:03 cicdw

I'm on 3.3.4 and ran into this issue with a library using both multiprocessing and threads. I had to patch it to force spawn to make it work.

jpsnyder avatar Apr 24 '25 21:04 jpsnyder

I'm on 3.3.4 and ran into this issue with a library using both multiprocessing and threads. I had to patch it to force spawn to make it work.

@jpsnyder – Curious if you recorded degraded running time performance by forcing "spawn"? I can provide anecdotal evidence that using set_start_method("spawn") slows down clock time by 10-100x. (This is with simple tests not involving prefect itself).

jGaboardi avatar Jul 22 '25 18:07 jGaboardi