dask-mpi
dask-mpi copied to clipboard
Strange interaction with astropy.coordinates.Angle
What happened:
This may be an issue in astropy, so my apologies if this is in the wrong location. Although, this appears to only happen using dask-mpi
.
I'm using dask-mpi
to distribute a task with using a astropy.coordinates.Angle
object. When I try to convert to the astropy.units.hourangle
format, I get the following error:
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/coordinates/angles.py", line 316, in to_string
values = self.hour
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 861, in __getattr__
raise AttributeError(
AttributeError: Angle instance has no attribute 'hour'
What you expected to happen:
Using a LocalCluster
and a SLURMcluster
I do not get this error using otherwise identical code. Further, astropy.coordinate.Angle
explicitly has an hour
property (L161):
@property
def hour(self):
"""
The angle's value in hours (read-only property).
"""
return self.hourangle
Further, if I do (see MCVE):
print('hour' in dir(coord))
I get True
! Something strange seems to be happening when I try to access the propety itself. I'll note something similar happened with coordinates.SkyCoord
and it's hms
property.
Minimal Complete Verifiable Example: This is as close to a minimal setup as my working script. Very frustratingly, the MCVE does not produce the same error. Hair pulling abounds.
from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
@delayed
def worker(i):
coord = Angle(i*u.deg)
print('coord.to_string()',coord.to_string())
print('coord.to_string(u.hourangle)',coord.to_string(u.hourangle, sep=':', precision=3))
return
def main():
initialize(interface='ipogif0')
client = Client()
results = []
for i in range(90):
results.append(
worker(i)
)
futures = client.persist(results)
outputs = [f.compute() for f in futures]
if __name__ == "__main__":
main()
Anything else we need to know?:
Environment:
- Dask version: 2021.05.0
- Dask-MPI version: 2.21.0
- Python version: 3.8.10
- Operating System: SUSE Linux Enterprise Server 12 SP3
- Install method (conda, pip, source): Conda
I would be surprised of something in dask-mpi
was causing this directly. My first instinct would be to look at python environments. Specifically comparing what happens differently between using dask-mpi
and SlurmCluster
.
Perhaps a good first step would be to check the sys.executable
on clusters submitted by both methods to ensure it is the same.
Thanks, @jacobtomlinson. I added the line:
print('sys.exec', sys.executable)
to the worker function.
Using both dask-mpi
and a LocalCluster
I get:
sys.exec /group/askap/athomson/miniconda3/envs/spice/bin/python3.8
which is the conda env I'd expect to be called
Your example code works perfectly for me, with astropy = 4.3.1, dask_mpi = 2.21.0, dask = 2021.08.0, Python = 3.8.10 and Ubuntu 20.04.2 LTS. I get both the degrees and hour strings out as expected.
I will note that the line numbers for the errors do not correspond to the version of astropy that I have installed.
Thanks, @ste616. The same is true for me as well, actually. The MCVE also runs fine for me, which is part of my confusion. There might be some conflict between that and my full working script, but for the life of me I can't see what it is.
EDIT: I've also found that the above problem (with u.hourangle) persists even when the function is not delayed (but still using dask-mpi)
As an update, it looks like the issue extends to other parts of astropy.units
. A script like:
from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np
@delayed
def worker(freq):
freq_arr = freq.to(u.Hz).value
return freq_arr
def main():
initialize(interface='ipogif0')
client = Client()
results = []
for i in range(100):
freq = np.arange(100) * u.Hz
results.append(
worker(freq)
)
futures = client.persist(results)
outputs = [f.compute() for f in futures]
print('outputs is',outputs)
if __name__ == "__main__":
main()
Raises astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible
. Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.
I can workaround this by doing the unit conversion in main
e.g.
from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np
@delayed
def worker(freq):
freq_arr = freq
return freq_arr
def main():
initialize(interface='ipogif0')
client = Client()
results = []
for i in range(100):
freq = np.arange(100) * u.Hz
results.append(
worker(freq.to(u.Hz).value)
)
futures = client.persist(results)
outputs = [f.compute() for f in futures]
print('outputs is',outputs)
if __name__ == "__main__":
main()
The full traceback is:
Traceback...
Traceback (most recent call last):
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/group/askap/athomson/repos/spiceracs/spiceracs/processSPICE.py", line 64, in frion_task
return frion.main(
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in main
updates = [f.compute() for f in futures]
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in <listcomp>
updates = [f.compute() for f in futures]
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 285, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 567, in compute
results = schedule(dsk, keys, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 2674, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1983, in gather
return self.sync(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 851, in sync
return sync(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync
raise exc.with_traceback(tb)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 337, in f
result[0] = yield future
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1848, in _gather
raise exception.with_traceback(traceback)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/utils.py", line 34, in apply
return func(*args, **kwargs)
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 80, in predict_worker
freq_array=freq.to(u.Hz).value,
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 698, in to
value = self._to_value(unit, equivalencies)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 662, in _to_value
return self.unit.to(unit, self.view(np.ndarray),
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 987, in to
return self._get_converter(other, equivalencies=equivalencies)(value)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 918, in _get_converter
raise exc
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 903, in _get_converter
return self._apply_equivalencies(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 886, in _apply_equivalencies
raise UnitConversionError(
astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible
EDIT 2:
If I don't delay the function with the coordinate/hourangle issue, the same error occurs if I use LocalCluster
. Delaying it allows it to work with LocalCluster
. It fails in either case with dask-mpi
.
The inconsistency is strange. Are you definitely using 2021.05.0
everywhere?
I'm pretty sure that's the case. I'm using a locally installed conda environment. As a test I added:
print("I'm in the {func} function!",'dask.__version__', dask.__version__)
print("I'm in the {func} function!",'dask.__file__', dask.__file__)
And I get:
I'm in the main function! dask.__version__ 2021.05.0
I'm in the main function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py
I'm in the worker function! dask.__version__ 2021.05.0
I'm in the worker function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py
And distributed
too? (They should be pinned but it's worth checking).
Here's a test with distributed
I'm in the main function! distributed.__version__ 2021.05.0
I'm in the main function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py
I'm in the worker function! distributed.__version__ 2021.05.0
I'm in the worker function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py
As small aside, I noticed I had an inconsistency in my module importing -- i.e.
import distributed
vs
from dask import distributed
I corrected to just use the former option, but the issue persists.
I noticed I had an inconsistency in my module importing
The latter is preferred, but either will mostly be fine.
Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.
Without a reproducer, I'm afraid this will be hard for us to track down.