xskillscore
xskillscore copied to clipboard
resample and boolean indexing with dask-arrays
I want to use the resample_iterations_idx
functionality to bootstrap evaluation metrics of hindcasts. The challenge with huge datasets is the memory allocation when storing all the iteration samples.
I started with the Mean squared error skill score and tried to understand the memory consumption. The following small example script demonstrates the usage of the metric and the memory consumption:
import numpy
import xskillscore as xskill
import xarray as xr
from memory_profiler import profile
@profile
def AMSEss():
alpha=0.10
coordtime="time"
dimbs="iteration"
dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
dsref = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
( numpy.square( bsp1 ) ).mean(dim=coordtime)
amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0,
p2divp1 - 1.0 ) # could be that nan's are not preserved
amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
del bsp1, bsp2, amsessf
BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
BSms = BSms.where( (amsessub)>0, -1.0 )
BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
BSms = BSms.where( amsesslb.notnull().data )
BSms.to_netcdf("bsms_chunked.nc")
if __name__ == '__main__':
AMSEss()
Running the script from the linux console gives me the following:
Line # Mem usage Increment Occurences Line Contents
============================================================
7 189.4 MiB 189.4 MiB 1 @profile
8 def AMSEss():
9 189.4 MiB 0.0 MiB 1 alpha=0.10
10 189.4 MiB 0.0 MiB 1 coordtime="time"
11 189.4 MiB 0.0 MiB 1 dimbs="iteration"
12 201.0 MiB 11.6 MiB 1 dsskill = xr.DataArray(data=15 + 20 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
13 212.5 MiB 11.4 MiB 1 dsref = xr.DataArray(data=15 + 15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
14 223.9 MiB 11.4 MiB 1 dsproof = xr.DataArray(data=15 + 10 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
15
16 5946.6 MiB 5722.7 MiB 1 bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
17 11680.0 MiB 5733.4 MiB 1 bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
18
19 11718.2 MiB 0.0 MiB 2 p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
20 11756.3 MiB 38.1 MiB 1 ( numpy.square( bsp1 ) ).mean(dim=coordtime)
21 11756.4 MiB 0.0 MiB 2 amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0,
22 11794.5 MiB 38.1 MiB 1 p2divp1 - 1.0 ) # could be that nan's are not preserved
23 11756.4 MiB 0.0 MiB 1 amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
24 11756.4 MiB 0.0 MiB 1 amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
25 274.2 MiB -11482.2 MiB 1 del bsp1, bsp2, amsessf
26 274.2 MiB 0.0 MiB 1 BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
27 274.2 MiB 0.0 MiB 1 BSms = BSms.where( (amsessub)>0, -1.0 )
28 274.2 MiB 0.0 MiB 1 BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
29 274.2 MiB 0.0 MiB 1 BSms = BSms.where( amsesslb.notnull().data )
Thus, we need 5.7GB to store each of the iteration samples bsp1
and bsp2
. That is consistent to the size of the arrays. However, often the climate datasets are much larger. So, I started working with dask arrays and changed the three lines:
dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
dsref = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
Now, the dask scheduler starts to collect all operations and performs the computation as the netcdf-file is written. However, the resampling_iterations_idx
seems to refer indices which belong to the unchunked fields but not to the chunked fields:
File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/optimization.py", line 963, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 151, in get
result = _execute_task(task, cache)
File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/zmaw/m221054/.local/lib/python3.8/site-packages/xskillscore/core/resampling.py", line 193, in select_bootstrap_indices_ufunc
return np.moveaxis(x.squeeze()[idx.squeeze().transpose()], 0, -1)
IndexError: index 144 is out of bounds for axis 0 with size 10
Is there a way, to use the resampling functionality on dask arrays to save memory.? It is not clear to me if this is really parallelizable. As already commented in issue #221 by @dougiesquire (https://github.com/xarray-contrib/xskillscore/pull/221#issuecomment-761374062), there is a problem with boolean indexing in numpy arrays which utilize dask arrays.
Is there a way, to use the resampling functionality on dask arrays to save memory.?
You can try resampling_iterations instead of resampling_iterations. This function doesn’t use vectorised indexing and is more robust on very large datasets in my experience
I am so stupid. Maybe, I found two solutions for this problem. I increased the size of the arrays to 200200150.
Solution A: Prior resampling process, I can remove the chunks from the dask arrays using chunk({'time':-1})
. However, this brings me back to the problem of memory usage.
Solution B: I remembered that the sign-tests xskill.halfwidth_ci_test
also contradicts with dask arrays if the dimension, which undergoes a correlation, carries multiple chunks. Thus, I prevent chunking the dimension which is part of the resampling, i.e.:
dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
dsref = xr.DataArray(data=15 + 0.15 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
And the memory statistics reads:
Line # Mem usage Increment Occurences Line Contents
============================================================
7 189.4 MiB 189.4 MiB 1 @profile
8 def AMSEss():
9 189.4 MiB 0.0 MiB 1 alpha=0.10
10 189.4 MiB 0.0 MiB 1 coordtime="time"
11 189.4 MiB 0.0 MiB 1 dimbs="iteration"
12 235.3 MiB 45.9 MiB 1 dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
13 281.1 MiB 45.8 MiB 1 dsref = xr.DataArray(data=15 + 0.15 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
14 326.9 MiB 45.8 MiB 1 dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
15
16 327.6 MiB 0.7 MiB 1 bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
17 328.2 MiB 0.6 MiB 1 bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
18
19 328.2 MiB 0.0 MiB 2 p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
20 328.2 MiB 0.0 MiB 1 ( numpy.square( bsp1 ) ).mean(dim=coordtime)
21 328.7 MiB 0.4 MiB 2 amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0,
22 328.6 MiB 0.0 MiB 1 p2divp1 - 1.0 ) # could be that nan's are not preserved
23 328.7 MiB 0.0 MiB 1 amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
24 328.7 MiB 0.0 MiB 1 amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
25 328.7 MiB 0.0 MiB 1 del bsp1, bsp2, amsessf
26 328.7 MiB 0.0 MiB 1 BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
27 328.7 MiB 0.0 MiB 1 BSms = BSms.where( (amsessub)>0, -1.0 )
28 329.4 MiB 0.7 MiB 1 BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
29 329.5 MiB 0.1 MiB 1 BSms = BSms.where( amsesslb.notnull().data )
30 329.5 MiB 0.0 MiB 1 print(BSms)
31 416.4 MiB 86.9 MiB 1 BSms.to_netcdf("bsms_chunked")
The robustness of the memory consumption measurement is not clear. However, I decided to track also child processes during the execution of the bootstrapping code: mprof --include-children
and got the following graphical results for usual xarray/numpy arrays:
https://1drv.ms/u/s!AktxgZXfBUm6tgmS7cEjl56RSsFW?e=IciPRG
and for xarray/dask arrays:
https://1drv.ms/u/s!AktxgZXfBUm6tgpu67yUeAFZt0SX?e=11hHEU
There is still a memory peak of 8GB using dask (it is hard to get the reason for it using this tracking, but maybe the resampling of the indices requires to group all the small chunks to larger chunks). However, the dask-arrays a) prevent for allocation of nearly 50GB for bsp1
and bsp2
and b) prevent a memory warning on the login-nodes of our cluster (the short-term memory peak seems to be not a problem).
Nevertheless, I would like to further reduce memory allocation and prevent from applying the resampling process twice, i.e. bsp1
and bsp2
. I would like to have the indices of each of the 500 iteration samples. And this indices I would like to apply to the metric, e.g.
p2divp1 = ( numpy.square( forecast1[rand_ind] - observations[rand_ind] ) ).mean(dim=coordtime) / \
( numpy.square( forecast2[rand_ind] - observations[rand_ind] ) ).mean(dim=coordtime)
where rand_ind
contains indices from 500 iteration samples, where each sample has randomly chosen (with replacement) time steps from the time series at location x and y.
Is there a small code example, which would illustrate how to realize this?
I would start from rebuilding resample_iterations(_idx) manually