Prefect Task is not serialized by distributed.protocol.serialize
Bug summary
It seems like the Prefect Task cannot be serialized with distributed.protocol.serialize. I slightly modified this test and created the following script.
from dask_jobqueue import PBSCluster
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def task_a():
return "a"
@task
def task_b():
return "b"
@task
def task_c(b: str):
return b + "c"
@flow(version="test")
def test_flow():
a = task_a.submit()
b = task_b.submit()
c = task_c.submit(b)
return a, b, c
def main():
with PBSCluster(
walltime="00:02:00",
processes=1,
cores=2,
memory="2GiB",
local_directory="/tmp",
job_extra_directives=["-V"],
) as cluster:
task_runner = DaskTaskRunner(cluster=cluster)
a, b, c = test_flow.with_options(task_runner=task_runner)()
print(a, b, c)
if __name__ == "__main__":
main()
Running this script on my PBS login node with Prefect server installation will cause the following exception.
Encountered exception during execution: TypeError('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f39bf1a42d0>\n 0. 139885965356672\n>')
Traceback (most recent call last):
File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function task_a at 0x7f39bd445ee0>: it's not the same object as __main__.task_a
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 65, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function task_a at 0x7f39bd445ee0>: it's not the same object as __main__.task_a
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 77, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
cp.dump(obj)
File "...venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'generator' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 763, in run_context
yield self
File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 1370, in run_flow_sync
engine.call_flow_fn()
File "...venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 783, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/.../simple-test.py", line 24, in test_flow
a = task_a.submit()
^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/prefect/tasks.py", line 1230, in submit
future = task_runner.submit(self, parameters, wait_for)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 431, in submit
future = self.client.submit(
^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/prefect_dask/client.py", line 64, in submit
future = super().submit(
^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/distributed/client.py", line 2174, in submit
futures = self._graph_to_futures(
^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/distributed/client.py", line 3377, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...venv/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 392, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f39bf1a42d0>\n 0. 139885965356672\n>')
PBS dependency is too much complicated to debug further.
Version info
Version: 3.3.2
API version: 0.8.4
Python version: 3.11.11
Git commit: e49e3185
Built: Thu, Apr 03, 2025 09:25 PM
OS/Arch: linux/x86_64
Profile: local
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-dask: 0.3.4
Additional context
distributed==2025.3.0
dask==2025.3.0
dask-jobqueue==0.9.0
prefect-dask==0.3.4
After carefully reading the code, I realized that this is not an issue of the PBSCluster. I updated the issue title. In the same environment following code works (the task is serialized by cloudpickle as expected).
from prefect import task
from distributed.protocol.serialize import pickle_dumps
@task
def task_a():
return "a"
out = pickle_dumps(task_a)
This made me think this was an issue of DaskTaskRunner itself (e.g. it may implicitly modifies object attributes) and I tested the following code with the same environment.
def main():
a, b, c = test_flow.with_options(task_runner=DaskTaskRunner)()
print(a, b, c)
Contrary to my expectations, this code worked. If I understand right, the difference between two experiments only lies in the underlying cluster class (i.e. PBSCluster vs LocalCluster). As far as I can read the codebase the cluster class doesn't look injecting a custom logic into serialization.
Has anyone tested DaskTaskRunner + PBSCluster?
Hi @nkanazawa1989! This is ultimately a cloudpickle issue, see:
- https://github.com/dask/distributed/issues/7954
- https://github.com/cloudpipe/cloudpickle/issues/509
tl;dr you can't pickle functions decorated through contextlib. There's a workaround in the meantime, which is to not use task/flow a decorators and instead instantiate them properly, e.g. this works (but pains me how ugly it is):
from dask_jobqueue import PBSCluster
from prefect import Task, Flow
from prefect_dask import DaskTaskRunner
def _task_a():
return "a"
def _task_b():
return "b"
def _task_c(b: str):
return b + "c"
task_a = Task(_task_a)
task_b = Task(_task_b)
task_c = Task(_task_c)
def _test_flow():
a = task_a()
b = task_b()
c = task_c(b)
return a, b, c
test_flow = Flow(_test_flow, version="test")
def main():
with PBSCluster(
walltime="00:02:00",
processes=1,
cores=2,
memory="2GiB",
local_directory="/tmp",
job_extra_directives=["-V"],
) as cluster:
task_runner = DaskTaskRunner(cluster=cluster)
a, b, c = test_flow.with_options(task_runner=task_runner)()
print(a, b, c)
if __name__ == "__main__":
main()
@aaazzam my understanding is that with your workaround, you don't submit() your tasks, thus they run locally, not on PBSCluster... I have tested on a HTCondorCluster, and no worker is spawned.
You get the same result when removing the .submit() from the OP's code.
Ok, actually this is not how a temporary DaskTaskRunner should be initialized.
Instead of passing an instance of cluster, you should pass cluster_class/cluster_kwargs. See #10136 .
So in your case,
task_runner = DaskTaskRunner(
cluster_class=PBSCluster,
cluster_kwargs={
"walltime": "00:02:00",
"processes": 1,
...
}
)
It is now working for me with HTCondorCluster.