xarray icon indicating copy to clipboard operation
xarray copied to clipboard

Notebook crashes after calling .to_dask_dataframe

Open lewfish opened this issue 2 years ago • 7 comments

What happened?

We are trying to convert a 17gb Zarr dataset to Parquet using xArray by calling xr.to_dask_dataframe and then ddf.to_parquet. When calling to_dask_dataframe the notebook crashes with "Kernel Restarting: The kernel for debug/minimal.ipynb appears to have died. It will restart automatically." We also find this occurs when using a synthetic dataset of the same size which we create in the example below.

What did you expect to happen?

We expected a Dask dataframe object to be created lazily and not crash the notebook. We expected the operation to be lazy based on the source code, and the following line in the docs "For datasets containing dask arrays where the data should be lazily loaded, see the Dataset.to_dask_dataframe() method."

Minimal Complete Verifiable Example

import dask.array as da
import xarray as xr
import numpy as np

chunks = 5000
dim1_sz = 100_000
dim2_sz = 100_000

# Does not crash when using the following constants.
'''
dim1_sz = 10_000
dim2_sz = 10_000
'''

ds = xr.Dataset({
    'x': xr.DataArray(
        data   = da.random.random((dim1_sz, dim2_sz), chunks=chunks),
        dims   = ['dim1', 'dim2'],
        coords = {'dim1': np.arange(0, dim1_sz), 'dim2': np.arange(0, dim2_sz)})})

df = ds.to_dask_dataframe()
df

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [X] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [X] New issue — a search of GitHub Issues suggests this is not a duplicate.

Relevant log output

No response

Anything else we need to know?

This operation crashes when the size of the array is above some (presumably machine specific) threshold, and works below it. You may need to play with the array size to replicate this behavior.

Environment

INSTALLED VERSIONS

commit: None python: 3.9.12 | packaged by conda-forge | (main, Mar 24 2022, 23:25:59) [GCC 10.3.0] python-bits: 64 OS: Linux OS-release: 5.4.196-108.356.amzn2.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: C.UTF-8 LANG: C.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: None libnetcdf: None

xarray: 2022.3.0 pandas: 1.4.2 numpy: 1.22.3 scipy: None netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.12.0 cftime: None nc_time_axis: None PseudoNetCDF: None rasterio: None cfgrib: None iris: None bottleneck: None dask: 2022.05.0 distributed: 2022.5.0 matplotlib: 3.5.2 cartopy: None seaborn: None numbagg: None fsspec: 2021.11.0 cupy: None pint: None sparse: None setuptools: 62.3.1 pip: 22.1 conda: None pytest: None IPython: 8.3.0 sphinx: None

lewfish avatar Jul 19 '22 18:07 lewfish

I ran this script in an ipython session on r5a.4xlarge instance which ran successfully! While this script was running I had another window with htop open and was surprised to observe that at one point during this script the memory usage was quite high (about 75 gb of memory). After the script completed, with a usable ipython repl prompt, the memory usage was down to 4gb (htop) and I was able to access the dataframe df and run some computations on it (df.head and df.groupby).

It appears that I'm mistaken in my understanding of xarray. I thought that xarray creates the Dataset/DataArray lazily but the high memory usage (for a brief time) indicates that that is not the case. This brings up yet another question: how come the memory usage is down to 4 gb (from htop) when the script concludes? And, how can I access all these values in my eventual calculations? For instance, running df.loc[df.dim1 < 1000, ['dim1','x']].groupby('x').mean().compute() runs with peak memory usage (as reported by htop) of about 25 gb!

I have been unable to find answers to my questions in the documentation. Can you please point me to docs (user or developer) which can help me clear my misunderstandings?

Thanks in advance!

vlulla avatar Jul 28 '22 15:07 vlulla

You can check whether it computes using the following

from xarray.tests import raise_if_dask_computes

with raise_if_dask_computes():
    ds.to_dask_dataframe()

Does that raise an error?

dcherian avatar Jul 28 '22 16:07 dcherian

No, i don't think this raised an error. This is what I see in my ipython session (from running ipython -i tst.py):

In [1]: from xarray.tests import raise_if_dask_computes                                                                                     
   ...:                                                                                                                                     
   ...: with raise_if_dask_computes():                                                                                                      
   ...:     ds.to_dask_dataframe()                                                                                                          
   ...:                                                                                                                                     
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large                                                                                                               
chunk and silence this warning, set the option                                                                                              
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):                                                                
    ...     array.reshape(shape)                                                                                                            
                                                                                                                                            
To avoid creating the large chunks, set the option                                                                                          
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):                                                                 
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning                                   
    >>> array.reshape(shape, limit='128 MiB')                                                                                               
  if await self.run_code(code, result, async_=asy):                                                                                         
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large                                                                                                               
chunk and silence this warning, set the option                                                                                              
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):                                                                
    ...     array.reshape(shape)                                                                                                            
                                                                                                                                            
To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning
    >>> array.reshape(shape, limit='128 MiB')
  if await self.run_code(code, result, async_=asy):
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array.reshape(shape)

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning
    >>> array.reshape(shape, limit='128 MiB')
  if await self.run_code(code, result, async_=asy):

In [2]:                             

vlulla avatar Jul 28 '22 16:07 vlulla

Well then it isn't computing.

Depending on the shape of your DataArray, there's potentially a reshape involved which can be expensive in parallel: https://docs.dask.org/en/stable/array-chunks.html#reshaping . It does manifest as large memory usage.

dcherian avatar Jul 28 '22 16:07 dcherian

Hmm...i read! If it (what is the "it" here? dask or xarray?) isn't computing, what are the values for x that I get in df.head()/df.tail()?

I tried

ds = xr.Dataset({
    'x': xr.DataArray(
        data   = da.random.random((dim1_sz, dim2_sz), chunks='auto'),
        dims   = ['dim1', 'dim2'],
        coords = {'dim1': np.arange(0, dim1_sz), 'dim2': np.arange(0, dim2_sz)})})

which chose the chunk size of (4000, 4000) and ds.to_daskdataframe() still used about 75 gb of ram (during some part of the computation). So, it appears that to convert a large Dataset/DataArray into a dask dataframe will require having a scheduler (assuming that the "it" above is dask) with sizeable amount of ram. Is that correct?

vlulla avatar Jul 28 '22 17:07 vlulla

Using chunks of shape (4000, 4000) isn't very different than what we were using originally (10_000, 10_000), so I'm not surprised the results are the same. After reading https://docs.dask.org/en/stable/array-chunks.html#reshaping I thought we could avoid the problem they discuss by making the chunks take up the entire width of the array using chunks=(100, -1). That didn't help the problem either.

lewfish avatar Aug 03 '22 15:08 lewfish

I thought we could avoid the problem they discuss by making the chunks take up the entire width of the array using chunks=(100, -1)

This is surprising. The reshape is the expensive step but with this chunking it should be operating blockwise.

dcherian avatar Aug 10 '22 23:08 dcherian

We want mainly to do the same thing as @lewfish . We got a tiff file and we want to save it as parquet (in order to push it on bigquery). However, I had the same issue by running the following code

path = 'big_file.tif' # around 8Gb

# Open the dataset
dst = xr.open_dataset(path, engine = 'rasterio', chunks={"band": -1, "x": 'auto', "y": 'auto'})

# Transform to a daskdataframe
dd = dst.to_dask_dataframe()

(I try lot of different possible chunks ...) And its crashes (I reach 90Gb of RAM {htop}) Looking forward to some advise.

BasileGoussard avatar Dec 09 '22 17:12 BasileGoussard