mdanalysis
mdanalysis copied to clipboard
AtomGroup doesn't rebind with each timestep after dask update
I can't reproduce this section of the user guide currently, apparently due to some breaking change in Dask. ~~I can't point to the exact Dask version in which this broke, but it works on 2021.1.0, but is broken on 2022.6.0 onwards.~~ It works up to 2021.4.0, the breaking version is 2021.4.1. Here's the changelog.
Before

Now

That is, the radii of gyration are all the same. They all come from the timestep 0.
Code
Given this:
@dask.delayed
def analyze_block(blockslice, func, *args, **kwargs):
result = []
for ts in u.trajectory[blockslice.start:blockslice.stop]:
A = func(*args, **kwargs)
result.append(A)
return result
Where protein is an AtomGroup: protein = u.select_atoms('protein')
jobs = []
for bs in blocks:
jobs.append(analyze_block(bs,
radgyr,
protein,
protein.masses,
total_mass=np.sum(protein.masses)))
jobs = dask.delayed(jobs)
While the timestep does update, the AtomGroup keeps bound to the first timestep.
A way to go around this is updating the protein selection and calling the radgyr() function with this new selection:
@dask.delayed
def analyze_block(blockslice, func, *args, **kwargs):
result = []
for ts in u.trajectory[blockslice.start:blockslice.stop]:
protein = u.select_atoms('protein') # This works
A = func(protein, protein.masses, **kwargs)
result.append(A)
return result
MDA: 2.4.0-dev0
@yuxuanzhuang have you encountered this problem?
I don't have an answer but we emphatically do not want to execute a selection
protein = u.select_atoms('protein') # This works
in an inner loop. That's terrible for performance.
I am not able to pinpoint the culprit from dozens of dask changes (it boils down to how dask serializes stuff e.g. if I set serializers to pickle/cloudpickle, it works fine) but a very easy fix can be explicitly add universe in analyze_block so they are guaranteed to be attached in new processes (see serialization discussion here in AtomGroup documentation)
@dask.delayed
def analyze_block(blockslice, universe, func, *args, **kwargs):
result = []
for ts in universe.trajectory[blockslice.start:blockslice.stop]:
A = func(*args, **kwargs)
result.append(A)
return result
jobs = []
for bs in blocks:
jobs.append(analyze_block(bs,
u,
radgyr,
protein,
protein.masses,
total_mass=np.sum(protein.masses)))
jobs = dask.delayed(jobs)
IMHO it's probably even clearer than before by explicitly setting the Universe.
This means we need to change docs, right?
I am going to remove the defect label because it seems a downstream dask issue (maybe they don't pull in objects from the context anymore?). It looks as if this (admittedly annoying and code-breaking) problem can be "fixed" by documentation changes.
Is there anything else we ought to be doing?
Including the Universe makes total sense to me. I opened an issue on the UserGuide to update that notebook, so I'd be ok with closing this one.
Updating the userguide and testing on dask 2024.4.2 this no longer seems to be an issue. Could someone please double check? cc @yuxuanzhuang and @pgbarletta
Confirming the issue has been resolved in versions from v2023.1.1 onward. The versions affected by the problem range from v2021.4.1 to v2023.1.0.
I suspect the fix might be related to the changes seen here: https://github.com/dask/dask/compare/2023.1.0...2023.1.1#diff-b2b064ba4d14c2c4d3c14bb57d0cda3849f084625015bb61f2ec1f7ffe8195ccR1166 (though I'm not entirely certain). Perhaps we can implement a solution downstream to prevent this issue in the future.