wrapt icon indicating copy to clipboard operation
wrapt copied to clipboard

Can't execute multiprocessing tasks using wrapt.decorator functions - not pickle serializable

Open marwan116 opened this issue 4 years ago • 6 comments

I am posting this to share the issue I face when trying to use multiple processes with a function wrapped with a wrapt.decorator.

I am using the following libraries: wrapt==1.12.1 joblib=0.14.1 python=3.7.6

Please see the dummy example below to reproduce the error

from joblib import Parallel, delayed
import wrapt


@wrapt.decorator
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res


@dummy_decorator
def add(x, y):
    return x + y

So first I try the code using multithreading and it works fine

with Parallel(n_jobs=2, prefer="threads") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

I get the following output:

before wrapped call
before wrapped call
after wrapped call
after wrapped call
4
6

Then when I try to use processes instead of threads:

with Parallel(n_jobs=2, prefer="processes") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

I get an error mainly: NotImplementedError: object proxy must define __reduce_ex__()

below is the full traceback:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 247, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 240, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 890, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
NotImplementedError: object proxy must define __reduce_ex__()
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ifm_utils/logging/dummy.py", line 24, in <module>
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 1017, in __call__
    self.retrieve()
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 909, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 562, in wrap_future_result
    return future.result(timeout=timeout)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.

marwan116 avatar Mar 25 '20 01:03 marwan116

I thought a workaround to the problem, if it is too hard to fix this, would be to disable the decorator and the following workaround works if the value passed toenabled is not a callable

@wrapt.decorator(enabled=False)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

then the below won't code throw any error

with Parallel(n_jobs=2, prefer="processes") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

however - to dynamically set enabled, I thought using a callable _enabled would be the way to go and this sadly doesn't work:

def _enabled():
    return False

@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

I see on other issue threads (mainly https://github.com/GrahamDumpleton/wrapt/issues/102) that perhaps an ObjectProxy is picklable/serializable if we explicitly define the __reduce__ and __reduce_ex__ methods - would this be the workaround needed here - i.e. to implement the decorator as a wrapper instead?

marwan116 avatar Mar 25 '20 01:03 marwan116

For dynamic function to specify whether enabled, should be:

def _enabled():
    return False

@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

Not sure if you just cut and paste the wrong thing.

But yes, it may not work as the function wrapper is still present when disabled using a function call, as is only evaluated at the time of the call. For the decorator to be applied at all, can only supply literal value.

def _enabled():
    return False

am_i_enabled = _enabled()

@wrapt.decorator(enabled=am_i_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

So call has to be evaluated at time of code import.

Anyway, I will think about pickle issue. For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work. Will need some investigation though. The analysis was never done for dill since that was a third party package, and so not necessarily commonly used.

GrahamDumpleton avatar Mar 25 '20 02:03 GrahamDumpleton

Sorry yes I had a typo there - just edited/corrected it ...

For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work

This would be extremely helpful - thank you for this - please let me know if I can help in the process in any way.

marwan116 avatar Mar 25 '20 02:03 marwan116

Hi @marwan116

have you given this any more thought?

ludaavics avatar May 02 '22 19:05 ludaavics