prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Prefect Task is not serialized by distributed.protocol.serialize

Open nkanazawa1989 opened this issue 8 months ago • 5 comments

Bug summary

It seems like the Prefect Task cannot be serialized with distributed.protocol.serialize. I slightly modified this test and created the following script.

from dask_jobqueue import PBSCluster
from prefect import flow, task
from prefect_dask import DaskTaskRunner


@task
def task_a():
    return "a"


@task
def task_b():
    return "b"


@task
def task_c(b: str):
    return b + "c"


@flow(version="test")
def test_flow():
    a = task_a.submit()
    b = task_b.submit()
    c = task_c.submit(b)
    return a, b, c


def main():
    with PBSCluster(
        walltime="00:02:00",
        processes=1,
        cores=2,
        memory="2GiB",
        local_directory="/tmp",
        job_extra_directives=["-V"],
    ) as cluster:
        task_runner = DaskTaskRunner(cluster=cluster)
        
        a, b, c = test_flow.with_options(task_runner=task_runner)()

    print(a, b, c)


if __name__ == "__main__":
    main()

Running this script on my PBS login node with Prefect server installation will cause the following exception.

Encountered exception during execution: TypeError('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f39bf1a42d0>\n 0. 139885965356672\n>')
Traceback (most recent call last):
  File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function task_a at 0x7f39bd445ee0>: it's not the same object as __main__.task_a

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function task_a at 0x7f39bd445ee0>: it's not the same object as __main__.task_a

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "...venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'generator' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 763, in run_context
    yield self
  File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 1370, in run_flow_sync
    engine.call_flow_fn()
  File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 783, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/home/.../simple-test.py", line 24, in test_flow
    a = task_a.submit()
        ^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/prefect/tasks.py", line 1230, in submit
    future = task_runner.submit(self, parameters, wait_for)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 431, in submit
    future = self.client.submit(
             ^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/prefect_dask/client.py", line 64, in submit
    future = super().submit(
             ^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/distributed/client.py", line 2174, in submit
    futures = self._graph_to_futures(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/distributed/client.py", line 3377, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 392, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f39bf1a42d0>\n 0. 139885965356672\n>')

PBS dependency is too much complicated to debug further.

Version info

Version:             3.3.2
API version:         0.8.4
Python version:      3.11.11
Git commit:          e49e3185
Built:               Thu, Apr 03, 2025 09:25 PM
OS/Arch:             linux/x86_64
Profile:             local
Server type:         server
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.3.4

Additional context

distributed==2025.3.0
dask==2025.3.0
dask-jobqueue==0.9.0
prefect-dask==0.3.4

nkanazawa1989 avatar Apr 07 '25 08:04 nkanazawa1989

After carefully reading the code, I realized that this is not an issue of the PBSCluster. I updated the issue title. In the same environment following code works (the task is serialized by cloudpickle as expected).

from prefect import task
from distributed.protocol.serialize import pickle_dumps

@task
def task_a():
        return "a"

out = pickle_dumps(task_a)

nkanazawa1989 avatar Apr 07 '25 11:04 nkanazawa1989

This made me think this was an issue of DaskTaskRunner itself (e.g. it may implicitly modifies object attributes) and I tested the following code with the same environment.

def main():
    a, b, c = test_flow.with_options(task_runner=DaskTaskRunner)()

    print(a, b, c)

Contrary to my expectations, this code worked. If I understand right, the difference between two experiments only lies in the underlying cluster class (i.e. PBSCluster vs LocalCluster). As far as I can read the codebase the cluster class doesn't look injecting a custom logic into serialization.

Has anyone tested DaskTaskRunner + PBSCluster?

nkanazawa1989 avatar Apr 07 '25 12:04 nkanazawa1989

Hi @nkanazawa1989! This is ultimately a cloudpickle issue, see:

  • https://github.com/dask/distributed/issues/7954
  • https://github.com/cloudpipe/cloudpickle/issues/509

tl;dr you can't pickle functions decorated through contextlib. There's a workaround in the meantime, which is to not use task/flow a decorators and instead instantiate them properly, e.g. this works (but pains me how ugly it is):

from dask_jobqueue import PBSCluster
from prefect import Task, Flow
from prefect_dask import DaskTaskRunner


def _task_a():
    return "a"

def _task_b():
    return "b"

def _task_c(b: str):
    return b + "c"

task_a = Task(_task_a)
task_b = Task(_task_b)
task_c = Task(_task_c)


def _test_flow():
    a = task_a()
    b = task_b()
    c = task_c(b)
    return a, b, c

test_flow = Flow(_test_flow, version="test")
def main():
    with PBSCluster(
        walltime="00:02:00",
        processes=1,
        cores=2,
        memory="2GiB",
        local_directory="/tmp",
        job_extra_directives=["-V"],
    ) as cluster:
        task_runner = DaskTaskRunner(cluster=cluster)
        
        a, b, c = test_flow.with_options(task_runner=task_runner)()

    print(a, b, c)


if __name__ == "__main__":
    main()

aaazzam avatar Apr 18 '25 00:04 aaazzam

@aaazzam my understanding is that with your workaround, you don't submit() your tasks, thus they run locally, not on PBSCluster... I have tested on a HTCondorCluster, and no worker is spawned. You get the same result when removing the .submit() from the OP's code.

fsteinmetz avatar Apr 26 '25 08:04 fsteinmetz

Ok, actually this is not how a temporary DaskTaskRunner should be initialized. Instead of passing an instance of cluster, you should pass cluster_class/cluster_kwargs. See #10136 . So in your case,

task_runner = DaskTaskRunner(
    cluster_class=PBSCluster,
    cluster_kwargs={
        "walltime": "00:02:00",
        "processes": 1,
        ...
    }
)

It is now working for me with HTCondorCluster.

fsteinmetz avatar May 23 '25 05:05 fsteinmetz