prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Multiprocessing in Prefect tasks causes deadlock if workload cannot be pickled

Open j-tr opened this issue 11 months ago • 4 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

follow up issue for #10794.

Running workloads in Prefect tasks that use multiprocessing and require objects that cannot be pickled causes deadlock.

According to the python docs, running multiprocessing with the default "fork" start method from a multithreaded process can be problematic and leads to a deadlock (showcased in test configuration "fork_task").

Setting the start method "spawn" as suggested in https://github.com/PrefectHQ/prefect/issues/10794#issuecomment-1832620128 requires that the workload can be pickled. Since MyClass from the MRE holds an open file object, it cannot be pickled and execution results in a cannot pickle '_io.TextIOWrapper' error (showcased in test configuration "spawn_task")

Using Prefect's cloudpickle_wrapped_call can solve this problem in certain situations (e.g. if the file is only open for reading). However, there are still cases where the workload is not even cloudpickle serializable (showcased in test configuration "spawn_task_cloudpickle").

Reproduction

from multiprocessing import Process, set_start_method
from prefect import flow, task
from prefect.utilities.callables import cloudpickle_wrapped_call


# running workload in flow, default start_method
# --> works !
fork_no_task = {
    "run_in_task": False,
    "start_method_type": "fork",
    "cloudpickle_wrapper": False,
}

# running workload in task, default start_method
# --> deadlock!
fork_task = {
    "run_in_task": True,
    "start_method_type": "fork",
    "cloudpickle_wrapper": False,
}

# running workload in task, start_method "spawn"
# --> cannot pickle '_io.TextIOWrapper' object because of open file
spawn_task = {
    "run_in_task": True,
    "start_method_type": "spawn",
    "cloudpickle_wrapper": False,
}

# running workload in task, start_method "spawn", and cloudpickle wrapper
# --> Cannot pickle files that are not opened for reading: w
# would work if file is opened for reading
spawn_task_cloudpickle = {
    "run_in_task": True,
    "start_method_type": "spawn",
    "cloudpickle_wrapper": True,
}

# change this assignment to run different tet config
TEST_CONFIG = spawn_task_cloudpickle

class MyClass:
    def __init__(self):
        self.file_obj = open("test.txt", "w")

    def do_things(self):
        print("Doing things")

    def do_things_in_multiple_processes(self):
        if TEST_CONFIG["cloudpickle_wrapper"]:
            Process(target=cloudpickle_wrapped_call(self.do_things)).start()
        else:
            Process(target=self.do_things).start()

@task
def my_task():
    MyClass().do_things_in_multiple_processes()


@flow()
def my_flow() -> None:
    if TEST_CONFIG["run_in_task"]:
        my_task()
    else:
        MyClass().do_things_in_multiple_processes()


if __name__ == "__main__":
    set_start_method(TEST_CONFIG["start_method_type"])
    my_flow()

Error

No response

Versions

Version:             2.16.0
API version:         0.8.4
Python version:      3.10.12
Git commit:          17f42e9d
Built:               Thu, Feb 22, 2024 3:45 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

j-tr avatar Feb 26 '24 13:02 j-tr