pydra
pydra copied to clipboard
error when passing python objects in workflow tasks
Hi, I would like to create a task which receives as input a "complex" object and (possibly) a function as following:
@pydra.mark.task
def wrapping_task(object, function):
result = function(object)
return result
This wrapping task is added into a workflow as following (I am omitting details for simplifying the reading):
wf = pydra.Workflow(
name="distributed_wf",
input_spec=list(inputs.keys()),
...
)
def f():
return 0
wf.add(
wrapping_task(
name="dummy_task",
object=python_object,
function=f,
)
)
wf.set_output(
[
("output", wf.dummy_task.lzout.out),
]
)
However, at runtime I get the following error:
Traceback (most recent call last):
File "/home/utente/utils/pydra/pydra/engine/core.py", line 1004, in _run
result.output = self._collect_outputs()
File "/home/utente/utils/pydra/pydra/engine/core.py", line 1088, in _collect_outputs
val_out = val.get_value(self)
File "/home/utente/utils/pydra/pydra/engine/specs.py", line 790, in get_value
if result.errored:
AttributeError: 'NoneType' object has no attribute 'errored
I am not sure what I am trying to do is actually possible with pydra. Any help with this will be appreciated.
Thank you in advance!
The obvious problem is that f takes no arguments, but I'm curious why you're writing a general wrapper instead of the actual functions you want to insert in the workflow.
Hi,
Thanks for the prompt reply.
I am not quite sure about why passing f with no arguments causes this problem. Can you give me an hint?
The same behaviour occurs using functions with arguments and even python objects. Below the example with a custom class C
:
wf = pydra.Workflow(
name="distributed_wf",
input_spec=list(inputs.keys()),
...
)
class C():
def __init__(self,):
self.x = 0
wf.add(
wrapping_task(
name="dummy_task",
object=C(),
)
)
wf.set_output(
[
("output", wf.dummy_task.lzout.out),
]
)
And below the stacktrace:
Task exception was never retrieved
future: <Task finished coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /home/utente/utils/pydra/pydra/engine/workers.py:160> exception=AttributeError("Can't pickle local object 'gen_workflow.<locals>.C'")>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'gen_workflow.<locals>.C'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/tasks.py", line 251, in __step
result = coro.throw(exc)
File "/home/utente/utils/pydra/pydra/engine/workers.py", line 163, in exec_as_coro
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/futures.py", line 260, in __await__
yield self # This tells Task to wait for completion.
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
future.result()
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/futures.py", line 178, in result
raise self._exception
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'gen_workflow.<locals>.C'
Traceback (most recent call last):
File "wf_distributed.py", line 157, in <module>
sub(runnable=wf)
File "/home/utente/utils/pydra/pydra/engine/submitter.py", line 56, in __call__
self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/site-packages/nest_asyncio.py", line 98, in run_until_complete
return f.result()
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/futures.py", line 178, in result
raise self._exception
File "/home/utente/anaconda3/envs/nnet/lib/python3.7/asyncio/tasks.py", line 249, in __step
result = coro.send(None)
File "/home/utente/utils/pydra/pydra/engine/submitter.py", line 72, in submit_workflow
await workflow._run(self, rerun=rerun)
File "/home/utente/utils/pydra/pydra/engine/core.py", line 1004, in _run
result.output = self._collect_outputs()
File "/home/utente/utils/pydra/pydra/engine/core.py", line 1088, in _collect_outputs
val_out = val.get_value(self)
File "/home/utente/utils/pydra/pydra/engine/specs.py", line 790, in get_value
if result.errored:
AttributeError: 'NoneType' object has no attribute 'errored'
I need this because I would like to execute tasks remotely (this is related to the possibility of having a "distributed pydra"). I am currently able to use Submitters with pluging="dask" for executing tasks remotely. However, as far as I understand, a workflow uses the same submitter for all the tasks and there is no possibility for specifying a different "address" for each remote task (I think this could be possible in future by working on the run method).
Thus, my simple idea is to encapsulate pydra tasks into "wrapping_tasks" which simply invoke a Submitter with plugin="dask", specifying the proper cluster. An example should be as following:
@pydra.mark.task
def task_decorator(task, plugin, address, params):
with pydra.Submitter(plugin=plugin, address=address) as sub:
future = sub(task(**params))
return future.result(return_inputs=True)
I am not quite sure about why passing f with no arguments causes this problem. Can you give me an hint?
Removing the Pydra stuff, you had:
def wrapping_task(object, function):
result = function(object)
return result
def f():
return 0
wrapping_task(object(), f)
In normal Python, this results in:
TypeError: f() takes 0 positional arguments but 1 was given
If you want to pass a value to a function, the function has to accept arguments.
The same behaviour occurs using functions with arguments and even python objects. Below the example with a custom class
C
:[...]
And below the stacktrace:
Task exception was never retrieved future: <Task finished coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /home/utente/utils/pydra/pydra/engine/workers.py:160> exception=AttributeError("Can't pickle local object 'gen_workflow.<locals>.C'")> concurrent.futures.process._RemoteTraceback: """ Traceback (most recent call last): File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/queues.py", line 236, in _feed obj = _ForkingPickler.dumps(obj) File "/home/utente/anaconda3/envs/nnet/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'gen_workflow.<locals>.C' """ [...]
As a rule, Pydra can only work with picklable objects. Dynamically generated classes are generally not picklable. If you have data types you want to pass around, defining them at the module level will help. Some may need extra methods defined to ensure that they can be pickled/unpickled. See What can be pickled and unpickled?
I need this because I would like to execute tasks remotely (this is related to the possibility of having a "distributed pydra"). I am currently able to use Submitters with pluging="dask" for executing tasks remotely. However, as far as I understand, a workflow uses the same submitter for all the tasks and there is no possibility for specifying a different "address" for each remote task (I think this could be possible in future by working on the run method).
Thus, my simple idea is to encapsulate pydra tasks into "wrapping_tasks" which simply invoke a Submitter with plugin="dask", specifying the proper cluster. An example should be as following:
@pydra.mark.task def task_decorator(task, plugin, address, params): with pydra.Submitter(plugin=plugin, address=address) as sub: future = sub(task(**params)) return future.result(return_inputs=True)
@djarecka Will probably be able to give you a better update on what the state of dask support is. I'm not sure if we have any planning docs on varying submitters within workflows. I know the idea of marking different tasks (or maybe just subworkflows) for different submission strategies has been discussed.
Hi, I just realized that everything works by defining objects and function globally (previously, I was defining them inside a function). This means that pickle is not able to serialize "local" objects. However, since the functions that I would like to pass are actually decorated using pydra annotations, they results as local objects. Thus the error:
AttributeError: Can't pickle local object 'gen_workflow.<locals>.C'
Do you see any possible workaround, or should I just give up?
not sure if I follow, the function you would like to pass to wrapping_task
are also decorated with pydra.mark.task
?
Regarding dask
- we used to support dask
but after some bigger changes the tests are failing, so the worker has to be reviewed, but we hope to bring it back.
Yes, it could be also decorated with pydra.mark.task
, but for now I could also avoid this so it can be pickled.
I tried without the decoration and it works.
So the only problem left is how to pass a custom object (as the "address" argument of the DaskWorker), since it cannot be directly serialized. For example:
cluster = LocalCluster(scheduler_port=0, dashboard_address=0)
wf.add(
task_decorator(
name="dummy_task",
task=dummy_task,
plugin="dask",
address=cluster <-- this won't work
)
)
This will throw the following exception:
AttributeError: cannot pickle '_thread.RLock' object
About the DaskWorker for the tests I have done so far with minor modification (I can share what I have done in another thread maybe).
I'll try later to repeat your example, but I'm not sure if I would be able to solve it if it can't be pickled..
But it would be really great if you can create a PR if you were working on the dask worker. Perhaps we can work on solution that could solve your issue.
if you create the DaskWorker
separately any keywords you give it will be set to client_args for the dask client.
https://github.com/nipype/pydra/blob/master/pydra/engine/workers.py#L363
and then client_args are passed on to dask Client
:
https://github.com/nipype/pydra/blob/master/pydra/engine/workers.py#L375