xarray icon indicating copy to clipboard operation
xarray copied to clipboard

`open_mfdataset` fails when Dask worker memory limit is less than dataset size

Open dsroberts opened this issue 7 months ago • 3 comments

What happened?

When running open_mfdataset() with parallel=True using a dask cluster with a worker memory limit of less than the size of the dataset, the dataset will not open. This is related to https://github.com/dask/dask/pull/11166 and https://github.com/pydata/xarray/issues/5764. As far as I can tell, because of this sizeof() operator for xarray collections, dask appears to assume every xarray operation will use the full dataset, including the delayed getattr call here.

What did you expect to happen?

The dataset should be opened.

Minimal Complete Verifiable Example

import xarray as xr
from dask.distributed import Client

client = Client(n_workers=1,memory_limit="5GB")
ds = xr.open_mfdataset(["https://thredds.nci.org.au/thredds/dodsC/hq89/CCAM/output/CMIP6/DD/AUS-10i/CSIRO/ACCESS-CM2/historical/r4i1p1f1/CCAM-v2203-SN/v1-r1/1hr/tas/v20231206/tas_AUS-10i_ACCESS-CM2_historical_r4i1p1f1_CSIRO_CCAM-v2203-SN_v1-r1_1hr_20000101-20001231.nc"],parallel=True)

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [ ] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
  • [X] Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

2024-06-28 21:59:55,022 - distributed.scheduler - ERROR - Task 'getattr-242eb9b6-025e-4daa-b6fb-e684d1de2c44' has 18.60 GiB worth of input dependencies, but worker tcp://127.0.0.1:40763 has memory_limit set to 4.66 GiB.

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
Cell In[1], line 5
      2 from dask.distributed import Client
      4 client = Client(n_workers=1,memory_limit="5GB")
----> 5 ds = xr.open_mfdataset(["https://thredds.nci.org.au/thredds/dodsC/hq89/CCAM/output/CMIP6/DD/AUS-10i/CSIRO/ACCESS-CM2/historical/r4i1p1f1/CCAM-v2203-SN/v1-r1/1hr/tas/v20231206/tas_AUS-10i_ACCESS-CM2_historical_r4i1p1f1_CSIRO_CCAM-v2203-SN_v1-r1_1hr_20000101-20001231.nc"],parallel=True)
      6 ds

File /g/data/hh5/public/apps/cms_conda/envs/analysis3-24.04/lib/python3.10/site-packages/xarray/backends/api.py:1062, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs)
   1057     datasets = [preprocess(ds) for ds in datasets]
   1059 if parallel:
   1060     # calling compute here will return the datasets/file_objs lists,
   1061     # the underlying datasets will still be stored as dask arrays
-> 1062     datasets, closers = dask.compute(datasets, closers)
   1064 # Combine all datasets, closing them in case of a ValueError
   1065 try:

File /g/data/hh5/public/apps/cms_conda/envs/analysis3-24.04/lib/python3.10/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    659     postcomputes.append(x.__dask_postcompute__())
    661 with shorten_traceback():
--> 662     results = schedule(dsk, keys, **kwargs)
    664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /g/data/hh5/public/apps/cms_conda/envs/analysis3-24.04/lib/python3.10/site-packages/distributed/client.py:2234, in Client._gather(self, futures, errors, direct, local_worker)
   2232         exc = CancelledError(key)
   2233     else:
-> 2234         raise exception.with_traceback(traceback)
   2235     raise exc
   2236 if errors == "skip":

MemoryError: Task 'getattr-242eb9b6-025e-4daa-b6fb-e684d1de2c44' has 18.60 GiB worth of input dependencies, but worker tcp://127.0.0.1:40763 has memory_limit set to 4.66 GiB.

Anything else we need to know?

I'm aware that the example is quite contrived, but this behaviour is consistent across large on-disk multi-file datasets. See e.g. https://forum.access-hive.org.au/t/xarray-warnings-while-loading-data-using-cosima-cookbook/2169/9 where the underlying open_mfdataset() call is here

Environment

INSTALLED VERSIONS

commit: None python: 3.10.14 | packaged by conda-forge | (main, Mar 20 2024, 12:45:18) [GCC 12.3.0] python-bits: 64 OS: Linux OS-release: 4.18.0-513.24.1.el8.nci.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: None LOCALE: ('en_US', 'UTF-8') libhdf5: 1.14.3 libnetcdf: 4.9.2

xarray: 2024.6.0 pandas: 2.2.2 numpy: 1.26.4 scipy: 1.14.0 netCDF4: 1.6.5 pydap: installed h5netcdf: 1.3.0 h5py: 3.11.0 zarr: 2.18.2 cftime: 1.6.4 nc_time_axis: 1.4.1 iris: 3.9.0 bottleneck: 1.4.0 dask: 2024.6.2 distributed: 2024.6.2 matplotlib: 3.8.4 cartopy: 0.23.0 seaborn: 0.13.2 numbagg: None fsspec: 2024.6.0 cupy: 13.2.0 pint: 0.23 sparse: 0.15.4 flox: 0.9.8 numpy_groupies: 0.11.1 setuptools: 70.0.0 pip: 24.0 conda: 24.5.0 pytest: 8.2.2 mypy: None IPython: 8.25.0 sphinx: 7.3.7

dsroberts avatar Jun 28 '24 12:06 dsroberts