iris
iris copied to clipboard
Some iris netcdf saves are fetching all the data.
From a support issue loading a large dataset from a GRIB file and saving to netcdf. The example data was about 16Gb, and was running out of memory when saving.
It seems that at least some netcdf saves are not able to be correctly saved in a chunk-by-chunk streamed manner.
Simple example to reproduce
import dask.array as da
import iris
from iris.cube import Cube
import tracemalloc
# Reasonably sized usage example, big enough to show problem
nt, ny, nx = 50, 700, 1400
# The data must be a stack of lazy arrays.
# A single random array with these chunks does *not* show the problem.
chunk_arrays = [
da.random.uniform(0., 1, size=(ny, nx))
for _ in range(nt)
]
data = da.stack(chunk_arrays)
cube = Cube(data)
print(f"Cube to save : {cube.summary(shorten=True)}")
data_size_mb = data.size * data.dtype.itemsize * 1.0 / 1024 ** 2
print(f"Cube data shape = {data.shape}, size = {data_size_mb:.1f} Mb")
tracemalloc.start()
iris.save(cube, 'tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()
memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during save : {memory_mb:.1f}Mb")
from which, a sample output :
Cube to save : unknown / (unknown) (-- : 50; -- : 700; -- : 1400)
Cube data shape = (50, 700, 1400), size = 373.8 Mb
Peak memory used during save : 377.9Mb
For comparison, it is OK in xarray (!! sorry 😬 !!) :
( using same data array )
import xarray as xr
xrda = xr.DataArray(data)
tracemalloc.stop()
tracemalloc.start()
xrda.to_netcdf('tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()
memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during Xarray save : {memory_mb:.1f}Mb")
from which..
Peak memory used during Xarray save : 30.2Mb
Expected result
The total memory required is expected to be only be around N or maybe N+1 * chunk sizes, where N is the number of Dask workers -- which here that was =4 (threads or "cpus"). So in this case, approx 8Mb per chunk, expected ~32-40 Mb., as seen in the Xarray case.
Further notes
I already investigated this in some depth, and it looks like the problem is the delayed-write mechanism introduced with #5191 .
In that, the da.store operation is lazy, and it is then combined in a delayed function with the results of fill-value checking.
So, I made some experiments to replicate this problem using synthetic data (as the above example), and a variety of storage mechanisms, like this :
if store_op == "all-direct":
delayed = da.store(lazydata_all, store_target)
elif store_op == "lazy-save":
delayed = da.store(lazydata_all, store_target, compute=False, lock=False)
delayed.compute()
elif store_op == "lazy-via-delayed":
@dask.delayed
def mysummary(stores, data):
return data
delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)
result = mysummary(delayed_save, 0)
result.compute()
elif store_op == "lazy-combined":
@dask.delayed
def mysummary(stores, data):
return data
delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)
alt_value = lazydata_all[0, 0, 0]
result = mysummary(delayed_save, alt_value)
result.compute()
elif store_op == "lazy-scanandstore":
def my_storing_arrayop(data, store, check_min, check_max):
store = data
result = np.count_nonzero((check_min <= data) & (data < check_max))
return np.array([result])
lazy_results = da.map_blocks(
my_storing_arrayop,
lazydata_all,
store_target,
0.0, 0.1, # min+max check range ops ==> count negative values
chunks=(1,), # indicate that results are scalars
dtype=bool,
drop_axis=[1, 2] # indicates that results are scalars
).sum()
result = lazy_results.compute()
datasize = lazydata_all.size
percent = 100.0 * result / datasize
print(
f"Lazy scan-and-store counted values < 0.1 : {result}"
f"\n ~{percent:0.2f}% of {datasize}'"
)
_, peak_mem = tracemalloc.get_traced_memory()
memory_mb = peak_mem * 1.0 / 1024 ** 2print("\nDone.")
print(f"Consumed memory ~ {mib:6.2f} Mb.")
Results ..
With similar data to the original case, the various above "store_op" options showed these memory usages :
| Saving mode, "store_op=" | Consumed memory / Mb |
|---|---|
| 'all-direct' | 33.71 |
| 'lazy-save' | 33.70 |
| 'lazy-via-delayed' | 33.70 |
| 'lazy-combined' | 432.50 |
| 'lazy-scanandstore' | 55.63 |
.. which means
wrapping the "lazy store operation" in a delayed is not in itself a problem (option 'lazy-via-delayed') , even with the additional "data" argument to the delayed function (in this case not a lazy object)
BUT passing an extra delayed argument which is a calculation on the same data does cause the problem ("lazy-combined" case) -- it seems that dask in this case cannot "see" that both operations can be performed on 1 chunk at a time So, this "'lazy-combined'" version is specifically designed to mimic what the netcdf saver is doing here -- (in that context, the purpose of the 'additional' calculation is the fill-value check ).
HOWEVER, the "lazy-scanandstore" I think shows a possible way out. The map_blocks operation now scans each section of data and stores it. So the chunks are treated separately, as they should be. So I think this is a model for how to fix the problem
@bouweandela @fnattino have you seen any examples of this?
Have you seen https://github.com/dask/dask/issues/8380 @pp-mo? I think it may be relevant here, but I'm not entirely sure yet.
I suspect this issue is caused by some variation of the issue described here. Adding
if arraylib is da:
from dask.graph_manipulation import clone
data = clone(data, assume_layers=True)
before this code https://github.com/SciTools/iris/blob/c6151e819189ca213b370ddf4ff38d8511c7e012/lib/iris/fileformats/netcdf/saver.py#L314 seems to avoid the high memory use by decoupling the store graph from the fill value check graph (at the cost of generating the source data twice).
I suspect this issue is caused by some variation of the issue described here. ... at the cost of generating the source data twice
Well, interesting. But TBH I don't really understand what this is really doing.
FWIW I think we are very keen not to fetch data twice, if it can at all be avoided -- and if we accepted that, we could simply do fill-value checking and storage in separate steps.
Do you think this approach could possibly be modified to deliver the desired one-pass data store-and-check, @bouweandela ?
Relevant issue raised on Dask repo https://github.com/dask/dask/issues/10991
@pp-mo happy to close this given #5833 ?