pmda
pmda copied to clipboard
fails with ChainReader trajectory
Expected behaviour
pmda can deal with any universe/trajectory that can be built with MDAnalysis
Actual behaviour
Fails with TypeError "TypeError: can't pickle generator objects" (see Stack trace below) when the universe.trajectory is a ChainReader
Code to reproduce the behaviour
import MDAnalysis as mda
from MDAnalysis.tests.datafiles import TPR, XTC
import pmda.rms
u = mda.Universe(TPR, 3*[XTC])
print(u.trajectory)
# <ChainReader ['adk_oplsaa.xtc', 'adk_oplsaa.xtc', 'adk_oplsaa.xtc'] with 30 frames of 47681 atoms>
protein = u.select_atoms("protein and not name H*")
R = pmda.rms.RMSD(protein, protein).run(n_jobs=4)
Currently version of MDAnalysis:
(run python -c "import MDAnalysis as mda; print(mda.__version__)"
)
(run python -c "import pmda; print(pmda.__version__)"
)
(run python -c "import dask; print(dask.__version__)"
)
- MDAnalysis: 0.18.1-dev
- pmda: 0.1.1
- dask: 0.17.5
Stack trace
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-4-aced2a5b0b3e> in <module>()
1 protein = u.select_atoms("protein and not name H*")
----> 2 R = pmda.rms.RMSD(protein, protein).run(n_jobs=4)
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/pmda/parallel.py in run(self, start, stop, step, scheduler, n_jobs, n_blocks)
257 blocks.append(task)
258 blocks = delayed(blocks)
--> 259 res = blocks.compute(**scheduler_kwargs)
260 self._results = np.asarray([el[0] for el in res])
261 self._conclude()
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
152 dask.base.compute
153 """
--> 154 (result,) = compute(self, traverse=False, **kwargs)
155 return result
156
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
405 keys = [x.__dask_keys__() for x in collections]
406 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407 results = get(dsk, keys, **kwargs)
408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
409
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
175 get_id=_process_get_id, dumps=dumps, loads=loads,
176 pack_exception=pack_exception,
--> 177 raise_exception=reraise, **kwargs)
178 finally:
179 if cleanup:
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
506 # Seed initial tasks into the thread pool
507 while state['ready'] and len(state['running']) < num_workers:
--> 508 fire_task()
509
510 # Main loop, wait on tasks to finish, insert new ones
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/local.py in fire_task()
500 # Submit
501 apply_async(execute_task,
--> 502 args=(key, dumps((dsk[key], data)),
503 dumps, loads, get_id, pack_exception),
504 callback=queue.put)
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/multiprocessing.py in _dumps(x)
28
29 def _dumps(x):
---> 30 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
31
32
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
893 try:
894 cp = CloudPickler(file, protocol=protocol)
--> 895 cp.dump(obj)
896 return file.getvalue()
897 finally:
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
266 self.inject_addons()
267 try:
--> 268 return Pickler.dump(self, obj)
269 except RuntimeError as e:
270 if 'recursion' in e.args[0]:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
665 else:
666 if PY3:
--> 667 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
668 else:
669 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "__reduce_ex__", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle generator objects
The problem is that ParallelAnalysisBase._dask_helper()
needs to be pickled by dask. But it is a bound method of ParallelAnalysisBase
so that method (or rather, the class that was derived from it, e.g. RMSD
) also needs to be pickled. If this class contains anything that cannot be pickled, the whole pickle fails. In particular, there is ParallelAnalysisBase._trajectory
, which contains the Reader from the universe (not the universe itself, which is currently no picklable). Almost all readers are picklable, except the ChainReader, because it contains a Generator:
_ChainReader__chained_trajectories_iter': <generator object ChainReader._chained_iterator at 0x7f1c19716468>
Ideally, pickling _dask_helper()
would not need to pickle the enclosing class. However, a short term fix should be to make the ChainReader picklable.
ChainReader should work once PR https://github.com/MDAnalysis/mdanalysis/pull/2723 is merged.