xarray icon indicating copy to clipboard operation
xarray copied to clipboard

`sel` behaving randomly when applying to a dataset with multiprocessing

Open guidocioni opened this issue 1 year ago • 12 comments

What happened?

I have a script structured like this

def main():
   global ds
   ds = xr.open_dataset(file)
   for point in points:
       compute(point)

def compute(point):
   ds_point = ds.sel(lat=point['latitude'],
                      lon=point['longitude'],
                      method='nearest')
   print(ds_point.var.mean())
   # do something with ds_point and other data...

if __name__ == "__main__":
    main()

This works as expected. However, if I try to parallelize compute by calling it with

process_map(compute, points, max_workers=5, chunksize=1)

The results of the print are completely different from the serial example and they change every time that I run the script. it seems that the sel is giving back a different part of the dataset when there are multiple processes running in parallel.

If I move the open_dataset statement inside compute then everything works also in the parallel case in the same way as in the serial one. Also, if I load the dataset at the beginning, i.e. ds = xr.open_dataset(file).load(), I also have reproducible results.

Is this supposed to happen? I really don't understand how.

What did you expect to happen?

The behaviour of sel should be the same in parallel or serial execution.

Minimal Complete Verifiable Example

No response

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [ ] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [ ] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [ ] New issue — a search of GitHub Issues suggests this is not a duplicate.

Relevant log output

No response

Anything else we need to know?

No response

Environment

INSTALLED VERSIONS

commit: None python: 3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10) [GCC 10.3.0] python-bits: 64 OS: Linux OS-release: 3.10.0-229.1.2.el7.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.utf8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.10.6 libnetcdf: 4.7.4

xarray: 2022.3.0 pandas: 1.2.3 numpy: 1.20.3 scipy: 1.8.1 netCDF4: 1.5.6 pydap: None h5netcdf: None h5py: None Nio: None zarr: None cftime: 1.6.1 nc_time_axis: None PseudoNetCDF: None rasterio: 1.2.1 cfgrib: None iris: None bottleneck: None dask: 2022.7.1 distributed: 2022.7.1 matplotlib: 3.5.2 cartopy: 0.18.0 seaborn: 0.11.2 numbagg: None fsspec: 2022.5.0 cupy: None pint: 0.19.2 sparse: None setuptools: 59.8.0 pip: 22.2 conda: 4.13.0 pytest: None IPython: 8.4.0 sphinx: None

guidocioni avatar Aug 09 '22 18:08 guidocioni

That sounds quite unfriendly!

A couple of questions to reduce the size of the example, without providing any answers yet unfortunately:

  • Is process_map from tqdm? Do you get the same behavior from the standard multiprocessing?
  • What if we remove method=nearest?
  • Is the file a single netCDF file?

max-sixty avatar Aug 09 '22 21:08 max-sixty

That sounds quite unfriendly!

A couple of questions to reduce the size of the example, without providing any answers yet unfortunately:

  • Is process_map from tqdm? Do you get the same behavior from the standard multiprocessing?

Yep, and yep (believe me, I've tried anything in desperation 😄)

  • What if we remove method=nearest?

Which method should I use then? I need the closest point

  • Is the file a single netCDF file?

Yep

I can try to make a minimal example, however, in order to reproduce the issue, I think it's necessary to open a large dataset.

guidocioni avatar Aug 10 '22 05:08 guidocioni

Re nearest, does it replicate with exact lookups?

max-sixty avatar Aug 10 '22 06:08 max-sixty

Re nearest, does it replicate with exact lookups?

I haven't tried yet because it doesn't really match my use case. One idea that I had was to provide the list of points before starting the loop, creating an iterator with the slices from the xarray and then pass this to the loop. But I would end up using more data than necessary because I don't process all cases.

another thing that I've noticed is that if the list of iterators is smaller than the chunksize everything's good, probably because it reverts to the serial case as only 1 worker is processing

guidocioni avatar Aug 10 '22 06:08 guidocioni

Can you try explicitly passing in a multiprocessing lock into the open_dataset() constructor? Something like:

from multiprocessing import Lock
ds = xarray.open_dataset(file, lock=Lock())

(We automatically select appropriate locks if using Dask, but I'm not sure how we would do that more generally...)

shoyer avatar Aug 10 '22 06:08 shoyer

Can you try explicitly passing in a multiprocessing lock into the open_dataset() constructor? Something like:

from multiprocessing import Lock
ds = xarray.open_dataset(file, lock=Lock())

(We automatically select appropriate locks if using Dask, but I'm not sure how we would do that more generally...)

ok that's a good shot. Will that work in the same way if I still use process_map, which uses concurrent.futures under the hood?

guidocioni avatar Aug 10 '22 06:08 guidocioni

Will that work in the same way if I still use process_map, which uses concurrent.futures under the hood?

Yes it should, as long as you're using multi-processing under the covers.

If you do multi-threading, then you would want to use threading.Lock(). But I believe we already apply a thread lock by default.

shoyer avatar Aug 10 '22 07:08 shoyer

Will that work in the same way if I still use process_map, which uses concurrent.futures under the hood?

Yes it should, as long as you're using multi-processing under the covers.

If you do multi-threading, then you would want to use threading.Lock(). But I believe we already apply a thread lock by default.

mmm ok I'll try and let you know.

BTW is there any advantage or difference in terms of cpu and memory consumption in opening the file only one or let it open by every process? I'm asking because I thought opening in every process was just plain stupid but it seems to perform exactly the same, so maybe I'm just creating a problem where there is none

guidocioni avatar Aug 10 '22 07:08 guidocioni

, lock=Lock()

That causes an error

Error 11: Resource temporarily unavailable

Here is the complete tracebabk

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 198, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 198, in <listcomp>
    return [fn(*args) for args in chunk]
  File "test_sel_bug.py", line 58, in compute_clima
    return station, ds_point.t_2m_med.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(), ds_point.lat.min().item()
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/common.py", line 58, in wrapped_func
    return self.reduce(func, dim, axis, skipna=skipna, **kwargs)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/dataarray.py", line 2696, in reduce
    var = self.variable.reduce(func, dim, axis, keep_attrs, keepdims, **kwargs)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 1806, in reduce
    data = func(self.data, **kwargs)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 339, in data
    return self.values
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 512, in values
    return _as_array_or_item(self._data)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 252, in _as_array_or_item
    data = np.asarray(data)
  File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 552, in __array__
    self._ensure_cached()
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 549, in _ensure_cached
    self.array = NumpyIndexingAdapter(np.asarray(self.array))
  File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 522, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 423, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/coding/variables.py", line 70, in __array__
    return self.func(self.array)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/coding/variables.py", line 137, in _apply_mask
    data = np.asarray(data, dtype=dtype)
  File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 423, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/backends/netCDF4_.py", line 93, in __getitem__
    return indexing.explicit_indexing_adapter(
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 712, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/backends/netCDF4_.py", line 106, in _getitem
    array = getitem(original_array, key)
  File "src/netCDF4/_netCDF4.pyx", line 4420, in netCDF4._netCDF4.Variable.__getitem__
  File "src/netCDF4/_netCDF4.pyx", line 5363, in netCDF4._netCDF4.Variable._get
  File "src/netCDF4/_netCDF4.pyx", line 1950, in netCDF4._netCDF4._ensure_nc_success
RuntimeError: Resource temporarily unavailable
"""

I think we may be heading the right direction

guidocioni avatar Aug 10 '22 08:08 guidocioni

Re nearest, does it replicate with exact lookups?

Ok, it seems to fail also with exact lookups o.O This is extremely weird

I'm using

def compute():
    ds_point = ds.isel(lat=0, lon=0)
    return ds_point.t_2m_med.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(), 
                ds_point.lat.min().item()

Result for the serial version

[(
  10.469047546386719,
  6.5044121742248535,
  6.0,
  48.0),
 (
  10.469047546386719,
  6.5044121742248535,
  6.0,
  48.0),
 (
  10.469047546386719,
  6.5044121742248535,
  6.0,
  48.0),
 (
  10.469047546386719,
  6.5044121742248535,
  6.0,
  48.0),
 (
  10.469047546386719,
  6.5044121742248535,
  6.0,
  48.0)]

As you would expect all values are the same.

And for the parallel version with EXACTLY the same code

[(
  7.968084812164307,
  6.948009967803955,
  6.0,
  48.0),
 (
  7.825599193572998,
  6.995675563812256,
  6.0,
  48.0),
 (
  8.894186019897461,
  6.849221706390381,
  6.0,
  48.0),
 (
  8.901763916015625,
  6.69615364074707,
  6.0,
  48.0),
 (
  9.164983749389648,
  6.484694480895996,
  6.0,
  48.0)]

guidocioni avatar Aug 10 '22 08:08 guidocioni

This is a minimal working example that I could come up with. You can try to open any netcdf that you have. I tested on a small one and it didn't reproduce the error, so it is definitely only happening with large datasets when the arrays are not loaded into memory. Unfortunately, as you need a large file, I cannot really attach it here.

import xarray as xr
from tqdm.contrib.concurrent import process_map
import pprint

def main():
    global ds
    ds = xr.open_dataset('input.nc')
    it = range(0, 5)
    results = []
    for i in it:
        results.append(compute(i))
    print("------------Serial results-----------------")
    pprint.pprint(results)
    results = process_map(compute, it, max_workers=6, chunksize=1, disable=True)
    print("------------Parallel results-----------------")
    pprint.pprint(results)


def compute(station):
    ds_point = ds.isel(lat=0, lon=0)
    return station, ds_point.t_2m_max.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(), ds_point.lat.min().item()


if __name__ == "__main__":
    main()

guidocioni avatar Aug 10 '22 09:08 guidocioni

You might look into different multiprocessing modes: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

It may also be that the NetCDF or HDF5 libraries were simply not written in a way that can support multi-processing. This would not surprise me.

BTW is there any advantage or difference in terms of cpu and memory consumption in opening the file only one or let it open by every process? I'm asking because I thought opening in every process was just plain stupid but it seems to perform exactly the same, so maybe I'm just creating a problem where there is none

I agree, maybe this isn't worth the trouble. I have not seen it done successfully before.

shoyer avatar Aug 10 '22 16:08 shoyer