dask-mpi icon indicating copy to clipboard operation
dask-mpi copied to clipboard

Strange interaction with astropy.coordinates.Angle

Open AlecThomson opened this issue 3 years ago • 10 comments

What happened: This may be an issue in astropy, so my apologies if this is in the wrong location. Although, this appears to only happen using dask-mpi.

I'm using dask-mpi to distribute a task with using a astropy.coordinates.Angle object. When I try to convert to the astropy.units.hourangle format, I get the following error:

  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/coordinates/angles.py", line 316, in to_string
    values = self.hour
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 861, in __getattr__
    raise AttributeError(
AttributeError: Angle instance has no attribute 'hour'

What you expected to happen: Using a LocalCluster and a SLURMcluster I do not get this error using otherwise identical code. Further, astropy.coordinate.Angle explicitly has an hour property (L161):

    @property
    def hour(self):
        """
        The angle's value in hours (read-only property).
        """
        return self.hourangle

Further, if I do (see MCVE):

print('hour' in dir(coord))

I get True! Something strange seems to be happening when I try to access the propety itself. I'll note something similar happened with coordinates.SkyCoord and it's hms property.

Minimal Complete Verifiable Example: This is as close to a minimal setup as my working script. Very frustratingly, the MCVE does not produce the same error. Hair pulling abounds.

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u

@delayed
def worker(i):
    coord = Angle(i*u.deg)
    print('coord.to_string()',coord.to_string())
    print('coord.to_string(u.hourangle)',coord.to_string(u.hourangle, sep=':', precision=3))
    return
            
def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(90):
        results.append(
            worker(i)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
if __name__ == "__main__":
    main()

Anything else we need to know?:

Environment:

  • Dask version: 2021.05.0
  • Dask-MPI version: 2.21.0
  • Python version: 3.8.10
  • Operating System: SUSE Linux Enterprise Server 12 SP3
  • Install method (conda, pip, source): Conda

AlecThomson avatar Aug 17 '21 11:08 AlecThomson

I would be surprised of something in dask-mpi was causing this directly. My first instinct would be to look at python environments. Specifically comparing what happens differently between using dask-mpi and SlurmCluster.

Perhaps a good first step would be to check the sys.executable on clusters submitted by both methods to ensure it is the same.

jacobtomlinson avatar Aug 17 '21 13:08 jacobtomlinson

Thanks, @jacobtomlinson. I added the line:

print('sys.exec', sys.executable)

to the worker function.

Using both dask-mpi and a LocalCluster I get:

sys.exec /group/askap/athomson/miniconda3/envs/spice/bin/python3.8

which is the conda env I'd expect to be called

AlecThomson avatar Aug 18 '21 02:08 AlecThomson

Your example code works perfectly for me, with astropy = 4.3.1, dask_mpi = 2.21.0, dask = 2021.08.0, Python = 3.8.10 and Ubuntu 20.04.2 LTS. I get both the degrees and hour strings out as expected.

I will note that the line numbers for the errors do not correspond to the version of astropy that I have installed.

ste616 avatar Aug 18 '21 05:08 ste616

Thanks, @ste616. The same is true for me as well, actually. The MCVE also runs fine for me, which is part of my confusion. There might be some conflict between that and my full working script, but for the life of me I can't see what it is.

AlecThomson avatar Aug 19 '21 00:08 AlecThomson

EDIT: I've also found that the above problem (with u.hourangle) persists even when the function is not delayed (but still using dask-mpi)

As an update, it looks like the issue extends to other parts of astropy.units. A script like:

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np

@delayed
def worker(freq):
    freq_arr = freq.to(u.Hz).value
    return freq_arr
            
def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(100):
        freq = np.arange(100) * u.Hz
        results.append(
            worker(freq)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
    print('outputs is',outputs)
if __name__ == "__main__":
    main()

Raises astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible. Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.

I can workaround this by doing the unit conversion in main e.g.

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np

@delayed
def worker(freq):
    freq_arr = freq
    return freq_arr
            
def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(100):
        freq = np.arange(100) * u.Hz
        results.append(
            worker(freq.to(u.Hz).value)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
    print('outputs is',outputs)
if __name__ == "__main__":
    main()

The full traceback is:

Traceback...

Traceback (most recent call last):
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
File "/group/askap/athomson/repos/spiceracs/spiceracs/processSPICE.py", line 64, in frion_task
    return frion.main(
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in main
    updates = [f.compute() for f in futures]
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in <listcomp>
    updates = [f.compute() for f in futures]
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 2674, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1983, in gather
    return self.sync(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 851, in sync
    return sync(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync
    raise exc.with_traceback(tb)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 337, in f
    result[0] = yield future
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1848, in _gather
    raise exception.with_traceback(traceback)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/utils.py", line 34, in apply
    return func(*args, **kwargs)
File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 80, in predict_worker
    freq_array=freq.to(u.Hz).value,
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 698, in to
    value = self._to_value(unit, equivalencies)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 662, in _to_value
    return self.unit.to(unit, self.view(np.ndarray),
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 987, in to
    return self._get_converter(other, equivalencies=equivalencies)(value)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 918, in _get_converter
    raise exc
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 903, in _get_converter
    return self._apply_equivalencies(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 886, in _apply_equivalencies
    raise UnitConversionError(
astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible

EDIT 2:

If I don't delay the function with the coordinate/hourangle issue, the same error occurs if I use LocalCluster. Delaying it allows it to work with LocalCluster. It fails in either case with dask-mpi.

AlecThomson avatar Aug 19 '21 04:08 AlecThomson

The inconsistency is strange. Are you definitely using 2021.05.0 everywhere?

jacobtomlinson avatar Aug 19 '21 08:08 jacobtomlinson

I'm pretty sure that's the case. I'm using a locally installed conda environment. As a test I added:

print("I'm in the {func} function!",'dask.__version__', dask.__version__)
print("I'm in the {func} function!",'dask.__file__', dask.__file__)

And I get:

I'm in the main function! dask.__version__ 2021.05.0
I'm in the main function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py
I'm in the worker function! dask.__version__ 2021.05.0
I'm in the worker function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py

AlecThomson avatar Aug 19 '21 23:08 AlecThomson

And distributed too? (They should be pinned but it's worth checking).

jacobtomlinson avatar Aug 20 '21 09:08 jacobtomlinson

Here's a test with distributed

I'm in the main function! distributed.__version__ 2021.05.0
I'm in the main function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py
I'm in the worker function! distributed.__version__ 2021.05.0
I'm in the worker function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py

As small aside, I noticed I had an inconsistency in my module importing -- i.e.

import distributed

vs

from dask import distributed

I corrected to just use the former option, but the issue persists.

AlecThomson avatar Aug 23 '21 02:08 AlecThomson

I noticed I had an inconsistency in my module importing

The latter is preferred, but either will mostly be fine.

Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.

Without a reproducer, I'm afraid this will be hard for us to track down.

jacobtomlinson avatar Aug 23 '21 10:08 jacobtomlinson