prefect
prefect copied to clipboard
Multiprocessing in Prefect tasks causes deadlock if workload cannot be pickled
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
follow up issue for #10794.
Running workloads in Prefect tasks that use multiprocessing and require objects that cannot be pickled causes deadlock.
According to the python docs, running multiprocessing with the default "fork" start method from a multithreaded process can be problematic and leads to a deadlock (showcased in test configuration "fork_task").
Setting the start method "spawn" as suggested in https://github.com/PrefectHQ/prefect/issues/10794#issuecomment-1832620128 requires that the workload can be pickled. Since MyClass
from the MRE holds an open file object, it cannot be pickled and execution results in a cannot pickle '_io.TextIOWrapper'
error (showcased in test configuration "spawn_task")
Using Prefect's cloudpickle_wrapped_call
can solve this problem in certain situations (e.g. if the file is only open for reading). However, there are still cases where the workload is not even cloudpickle serializable (showcased in test configuration "spawn_task_cloudpickle").
Reproduction
from multiprocessing import Process, set_start_method
from prefect import flow, task
from prefect.utilities.callables import cloudpickle_wrapped_call
# running workload in flow, default start_method
# --> works !
fork_no_task = {
"run_in_task": False,
"start_method_type": "fork",
"cloudpickle_wrapper": False,
}
# running workload in task, default start_method
# --> deadlock!
fork_task = {
"run_in_task": True,
"start_method_type": "fork",
"cloudpickle_wrapper": False,
}
# running workload in task, start_method "spawn"
# --> cannot pickle '_io.TextIOWrapper' object because of open file
spawn_task = {
"run_in_task": True,
"start_method_type": "spawn",
"cloudpickle_wrapper": False,
}
# running workload in task, start_method "spawn", and cloudpickle wrapper
# --> Cannot pickle files that are not opened for reading: w
# would work if file is opened for reading
spawn_task_cloudpickle = {
"run_in_task": True,
"start_method_type": "spawn",
"cloudpickle_wrapper": True,
}
# change this assignment to run different tet config
TEST_CONFIG = spawn_task_cloudpickle
class MyClass:
def __init__(self):
self.file_obj = open("test.txt", "w")
def do_things(self):
print("Doing things")
def do_things_in_multiple_processes(self):
if TEST_CONFIG["cloudpickle_wrapper"]:
Process(target=cloudpickle_wrapped_call(self.do_things)).start()
else:
Process(target=self.do_things).start()
@task
def my_task():
MyClass().do_things_in_multiple_processes()
@flow()
def my_flow() -> None:
if TEST_CONFIG["run_in_task"]:
my_task()
else:
MyClass().do_things_in_multiple_processes()
if __name__ == "__main__":
set_start_method(TEST_CONFIG["start_method_type"])
my_flow()
Error
No response
Versions
Version: 2.16.0
API version: 0.8.4
Python version: 3.10.12
Git commit: 17f42e9d
Built: Thu, Feb 22, 2024 3:45 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.37.2
Additional context
No response