Task hangs due to multiprocessing.Pool.join()
Bug summary
This is just a restatement of: https://github.com/PrefectHQ/prefect/issues/13584 with more info and the motivation to follow it up. If I can fix this, I'll be recommending my client go with prefect (my own firm uses it for automation)
I think this may be due to python forking and then threads from the parents are waited for in the forked process. Although this is meant to be an anti pattern, it happens a lot to me in complex workflows. In particular, if I try and create my own version of some way to timeout a task due to timeout_seconds not working, in a subprocess or thread, when the child process exits, it gets blocked waiting on threads that are not present. I have some flows that seem to be doing some forking on account of the underlying python calls. This is out of my control.
The following example is from the original bug. This example locks up on waiter.acquire() (as shown in the following stack traces).
from prefect import flow, task
import time
import multiprocessing as mp
def _f(i):
print(f"a-{i:02d}")
time.sleep(0.1)
@task(name="small_task", log_prints=True)
def small_task(worker_count: int = 4, count: int = 10):
responses = []
with mp.Pool(worker_count) as pool:
for f in range(count):
responses.append(pool.apply_async(_f, args=(f,)))
print(f"start: pool.close()")
pool.close()
print(f"end: pool.close()")
print(f"start: pool.join()")
pool.join()
print(f"end: pool.join()")
return responses
@flow(name="test")
def flow_test(worker_count: int = 4, count: int = 10):
return small_task(worker_count=worker_count, count=count)
if __name__ == "__main__":
flow_test(worker_count=4, count=10)
This example is 100% repeatable:
Stack trace and run in the later box
Version info (prefect version output)
prefect version
Version: 2.20.1
API version: 0.8.4
Python version: 3.11.2
Git commit: f6bebfca
Built: Fri, Aug 9, 2024 10:35 AM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.40.1
Additional context
This is on linux:
Linux old9 6.1.0-23-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.99-1 (2024-07-15) x86_64 GNU/Linux
cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)"
NAME="Debian GNU/Linux"
VERSION_ID="12"
VERSION="12 (bookworm)"
VERSION_CODENAME=bookworm
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
Run
python pt2.py
06:12:09.556 | INFO | prefect.engine - Created flow run 'competent-guan' for flow 'test'
06:12:09.606 | INFO | Flow run 'competent-guan' - Created task run 'small_task-0' for task 'small_task'
06:12:09.606 | INFO | Flow run 'competent-guan' - Executing 'small_task-0' immediately...
06:12:09.650 | INFO | Task run 'small_task-0' - start: pool.close()
06:12:09.651 | INFO | Task run 'small_task-0' - end: pool.close()
06:12:09.652 | INFO | Task run 'small_task-0' - start: pool.join()
06:12:09.653 | INFO | Task run 'small_task-0' - a-00
06:12:09.653 | INFO | Task run 'small_task-0' - a-01
06:12:09.653 | INFO | Task run 'small_task-0' - a-02
06:12:09.653 | INFO | Task run 'small_task-0' - a-03
06:12:09.756 | INFO | Task run 'small_task-0' - a-04
06:12:09.756 | INFO | Task run 'small_task-0' - a-07
06:12:09.756 | INFO | Task run 'small_task-0' - a-06
06:12:09.756 | INFO | Task run 'small_task-0' - a-05
06:12:09.858 | INFO | Task run 'small_task-0' - a-08
06:12:09.858 | INFO | Task run 'small_task-0' - a-09
Locked up here forever now . So I have to press ^C to stop it. If this is in a deployment running on agent it will just block forever:
Stack trace
^C06:13:17.890 | ERROR | Task run 'small_task-0' - Crash detected! Execution was cancelled by the runtime environment.
06:13:17.930 | ERROR | Flow run 'competent-guan' - Crash detected! Execution was aborted by an interrupt signal.
+ Exception Group Traceback (most recent call last):
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
| await self.gen.athrow(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 182, in start
| async with AsyncExitStack() as exit_stack:
| File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
| raise exc_details[1]
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
| raise BaseExceptionGroup(
| BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
| flow_test(worker_count=4, count=10)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
| return enter_flow_run_engine_from_flow_call(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
| retval = from_sync.wait_for_call_in_loop_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
| return call.result()
| ^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
| return self.future.result(timeout=timeout)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
| return self.__get_result()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
| raise self._exception
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
| result = await coro
| ^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
| return await fn(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
| state = await begin_flow_run(
| ^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
| async with AsyncExitStack() as stack:
| File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
| raise exc_details[1]
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
| await self.gen.athrow(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
| with collapse_excgroups():
| File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
| self.gen.throw(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
| raise exc
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
| yield self
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
| terminal_or_paused_state = await orchestrate_flow_run(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
| result = await flow_call.aresult()
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
| return await asyncio.wrap_future(self.future)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
| result = self.fn(*self.args, **self.kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
| return small_task(worker_count=worker_count, count=count)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
| return enter_task_run_engine(
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
| return from_sync.wait_for_call_in_loop_thread(begin_run)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
| waiter.wait()
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
| self._handle_waiting_callbacks()
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
| callback: Call = self._queue.get()
| ^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/queue.py", line 171, in get
| self.not_empty.wait()
| File "/usr/lib/python3.11/threading.py", line 320, in wait
| waiter.acquire()
| KeyboardInterrupt
+------------------------------------
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 745, in collapse_excgroups
| yield
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2254, in report_flow_run_crashes
| yield
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
| raise BaseExceptionGroup(
| BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
| await self.gen.athrow(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 182, in start
| async with AsyncExitStack() as exit_stack:
| File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
| raise exc_details[1]
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 680, in __aexit__
| raise BaseExceptionGroup(
| BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
| flow_test(worker_count=4, count=10)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
| return enter_flow_run_engine_from_flow_call(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
| retval = from_sync.wait_for_call_in_loop_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
| return call.result()
| ^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
| return self.future.result(timeout=timeout)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
| return self.__get_result()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
| raise self._exception
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
| result = await coro
| ^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
| return await fn(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
| state = await begin_flow_run(
| ^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
| async with AsyncExitStack() as stack:
| File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
| raise exc_details[1]
| File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
| cb_suppress = await cb(*exc_details)
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
| await self.gen.athrow(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
| with collapse_excgroups():
| File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
| self.gen.throw(typ, value, traceback)
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
| raise exc
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
| yield self
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
| terminal_or_paused_state = await orchestrate_flow_run(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
| result = await flow_call.aresult()
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
| return await asyncio.wrap_future(self.future)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
| result = self.fn(*self.args, **self.kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
| return small_task(worker_count=worker_count, count=count)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
| return enter_task_run_engine(
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
| return from_sync.wait_for_call_in_loop_thread(begin_run)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
| waiter.wait()
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
| self._handle_waiting_callbacks()
| File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
| callback: Call = self._queue.get()
| ^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.11/queue.py", line 171, in get
| self.not_empty.wait()
| File "/usr/lib/python3.11/threading.py", line 320, in wait
| waiter.acquire()
| KeyboardInterrupt
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/storage/wa/old9/dagger-automation/pt2.py", line 29, in <module>
flow_test(worker_count=4, count=10)
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/flows.py", line 1237, in __call__
return enter_flow_run_engine_from_flow_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 297, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 389, in create_then_begin_flow_run
state = await begin_flow_run(
^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 517, in begin_flow_run
async with AsyncExitStack() as stack:
File "/usr/lib/python3.11/contextlib.py", line 733, in __aexit__
raise exc_details[1]
File "/usr/lib/python3.11/contextlib.py", line 716, in __aexit__
cb_suppress = await cb(*exc_details)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 2253, in report_flow_run_crashes
with collapse_excgroups():
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 750, in collapse_excgroups
raise exc
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/task_runners.py", line 187, in start
yield self
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 558, in begin_flow_run
terminal_or_paused_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 884, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/dagger-automation/pt2.py", line 26, in flow_test
return small_task(worker_count=worker_count, count=count)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/tasks.py", line 703, in __call__
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/engine.py", line 1459, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
waiter.wait()
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
self._handle_waiting_callbacks()
File "/storage/wa/old9/ptest/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
callback: Call = self._queue.get()
^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/queue.py", line 171, in get
self.not_empty.wait()
File "/usr/lib/python3.11/threading.py", line 320, in wait
waiter.acquire()
KeyboardInterrupt
Anything I am missing?
(if someone wants to ssh into my machine, this is just a new lxd container so you are welcome)
Thanks for the issue @mianos!
I was able to reproduce this issue on a Linux machine, but it works as expected on a Mac. I am also seeing this issue in the most recent 2.x version as well as the most recent 3.0 RC release. I think there must be a difference in process management causing this issue, but I'll have to investigate further to find a root cause. If you find any new info, let me know!
Is there a way I can cancel all the prefect managed threads when my forked process exits as a workaround? It seems a hack, but something like "I am not using any threads in this process so just exit without waiting"?
I can confirm the problem exists. what is different on Mac is that process pool uses spawn method to start processes while Linux uses fork. @mianos maybe you can force spawn for your process pool and see if that helps.
My trouble is I am using a lot of third party libraries. I assume one of them (paramiko, docker), is doing a fork. I don't have any mention of fork or multiprocessing in my flow source code. Otherwise I'd replace it with spawn. The above example is not my code but shows the exact same back trace as mine.
I believe this was fixed in 3.2.14 - let us know if that's not the case though.
I'm on 3.3.4 and ran into this issue with a library using both multiprocessing and threads. I had to patch it to force spawn to make it work.
I'm on 3.3.4 and ran into this issue with a library using both multiprocessing and threads. I had to patch it to force spawn to make it work.
@jpsnyder – Curious if you recorded degraded running time performance by forcing "spawn"? I can provide anecdotal evidence that using set_start_method("spawn") slows down clock time by 10-100x. (This is with simple tests not involving prefect itself).