wrapt
wrapt copied to clipboard
Can't execute multiprocessing tasks using wrapt.decorator functions - not pickle serializable
I am posting this to share the issue I face when trying to use multiple processes with a function wrapped with a wrapt.decorator
.
I am using the following libraries:
wrapt==1.12.1
joblib=0.14.1
python=3.7.6
Please see the dummy example below to reproduce the error
from joblib import Parallel, delayed
import wrapt
@wrapt.decorator
def dummy_decorator(wrapped, instance, args, kwargs):
print("before wrapped call")
res = wrapped(*args, **kwargs)
print("after wrapped call")
return res
@dummy_decorator
def add(x, y):
return x + y
So first I try the code using multithreading and it works fine
with Parallel(n_jobs=2, prefer="threads") as parallel:
fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
for f in fs:
print(f)
I get the following output:
before wrapped call
before wrapped call
after wrapped call
after wrapped call
4
6
Then when I try to use processes instead of threads:
with Parallel(n_jobs=2, prefer="processes") as parallel:
fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
for f in fs:
print(f)
I get an error mainly: NotImplementedError: object proxy must define __reduce_ex__()
below is the full traceback:
joblib.externals.loky.process_executor._RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/queues.py", line 150, in _feed
obj_ = dumps(obj, reducers=reducers)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 247, in dumps
dump(obj, buf, reducers=reducers, protocol=protocol)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 240, in dump
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 482, in dump
return Pickler.dump(self, obj)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 890, in _batch_setitems
save(v)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 819, in save_list
self._batch_appends(obj)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 846, in _batch_appends
save(tmp[0])
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 524, in save
rv = reduce(self.proto)
NotImplementedError: object proxy must define __reduce_ex__()
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "ifm_utils/logging/dummy.py", line 24, in <module>
fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 1017, in __call__
self.retrieve()
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 909, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 562, in wrap_future_result
return future.result(timeout=timeout)
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 435, in result
return self.__get_result()
File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.
I thought a workaround to the problem, if it is too hard to fix this, would be to disable the decorator and the following workaround works if the value passed toenabled
is not a callable
@wrapt.decorator(enabled=False)
def dummy_decorator(wrapped, instance, args, kwargs):
print("before wrapped call")
res = wrapped(*args, **kwargs)
print("after wrapped call")
return res
then the below won't code throw any error
with Parallel(n_jobs=2, prefer="processes") as parallel:
fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
for f in fs:
print(f)
however - to dynamically set enabled
, I thought using a callable _enabled
would be the way to go and this sadly doesn't work:
def _enabled():
return False
@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
print("before wrapped call")
res = wrapped(*args, **kwargs)
print("after wrapped call")
return res
I see on other issue threads (mainly https://github.com/GrahamDumpleton/wrapt/issues/102) that perhaps an ObjectProxy is picklable/serializable if we explicitly define the __reduce__
and __reduce_ex__
methods - would this be the workaround needed here - i.e. to implement the decorator as a wrapper instead?
For dynamic function to specify whether enabled, should be:
def _enabled():
return False
@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
print("before wrapped call")
res = wrapped(*args, **kwargs)
print("after wrapped call")
return res
Not sure if you just cut and paste the wrong thing.
But yes, it may not work as the function wrapper is still present when disabled using a function call, as is only evaluated at the time of the call. For the decorator to be applied at all, can only supply literal value.
def _enabled():
return False
am_i_enabled = _enabled()
@wrapt.decorator(enabled=am_i_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
print("before wrapped call")
res = wrapped(*args, **kwargs)
print("after wrapped call")
return res
So call has to be evaluated at time of code import.
Anyway, I will think about pickle issue. For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work. Will need some investigation though. The analysis was never done for dill since that was a third party package, and so not necessarily commonly used.
Sorry yes I had a typo there - just edited/corrected it ...
For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work
This would be extremely helpful - thank you for this - please let me know if I can help in the process in any way.
Hi @marwan116
have you given this any more thought?