iris icon indicating copy to clipboard operation
iris copied to clipboard

Some iris netcdf saves are fetching all the data.

Open pp-mo opened this issue 1 year ago • 1 comments

From a support issue loading a large dataset from a GRIB file and saving to netcdf. The example data was about 16Gb, and was running out of memory when saving.

It seems that at least some netcdf saves are not able to be correctly saved in a chunk-by-chunk streamed manner.

Simple example to reproduce

import dask.array as da
import iris
from iris.cube import Cube
import tracemalloc

# Reasonably sized usage example, big enough to show problem
nt, ny, nx = 50, 700, 1400
# The data must be a stack of lazy arrays.
# A single random array with these chunks does *not* show the problem.
chunk_arrays = [
    da.random.uniform(0., 1, size=(ny, nx))
    for _ in range(nt)
]
data = da.stack(chunk_arrays)
cube = Cube(data)
print(f"Cube to save : {cube.summary(shorten=True)}")
data_size_mb = data.size * data.dtype.itemsize * 1.0 / 1024 ** 2
print(f"Cube data shape = {data.shape}, size = {data_size_mb:.1f} Mb")

tracemalloc.start()
iris.save(cube, 'tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()

memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during save : {memory_mb:.1f}Mb")

from which, a sample output :

Cube to save : unknown / (unknown)                 (-- : 50; -- : 700; -- : 1400)
Cube data shape = (50, 700, 1400), size = 373.8 Mb
Peak memory used during save : 377.9Mb

For comparison, it is OK in xarray (!! sorry 😬 !!) :

( using same data array )

import xarray as xr
xrda = xr.DataArray(data)

tracemalloc.stop()
tracemalloc.start()
xrda.to_netcdf('tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()

memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during Xarray save : {memory_mb:.1f}Mb")

from which..

Peak memory used during Xarray save : 30.2Mb

Expected result

The total memory required is expected to be only be around N or maybe N+1 * chunk sizes, where N is the number of Dask workers -- which here that was =4 (threads or "cpus"). So in this case, approx 8Mb per chunk, expected ~32-40 Mb., as seen in the Xarray case.

pp-mo avatar Feb 16 '24 17:02 pp-mo

Further notes

I already investigated this in some depth, and it looks like the problem is the delayed-write mechanism introduced with #5191 . In that, the da.store operation is lazy, and it is then combined in a delayed function with the results of fill-value checking.

So, I made some experiments to replicate this problem using synthetic data (as the above example), and a variety of storage mechanisms, like this :

if store_op == "all-direct":
    delayed = da.store(lazydata_all, store_target)

elif store_op == "lazy-save":
    delayed = da.store(lazydata_all, store_target, compute=False, lock=False)
    delayed.compute()

elif store_op == "lazy-via-delayed":

    @dask.delayed
    def mysummary(stores, data):
        return data

    delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)

    result = mysummary(delayed_save, 0)

    result.compute()

elif store_op == "lazy-combined":
    @dask.delayed
    def mysummary(stores, data):
        return data

    delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)

    alt_value = lazydata_all[0, 0, 0]
    result = mysummary(delayed_save, alt_value)

    result.compute()

elif store_op == "lazy-scanandstore":

    def my_storing_arrayop(data, store, check_min, check_max):
        store = data
        result = np.count_nonzero((check_min <= data) & (data < check_max))
        return np.array([result])

    lazy_results = da.map_blocks(
        my_storing_arrayop,
        lazydata_all,
        store_target,
        0.0, 0.1,  # min+max check range ops ==> count negative values
        chunks=(1,),  # indicate that results are scalars
        dtype=bool,
        drop_axis=[1, 2]  # indicates that results are scalars
    ).sum()
    result = lazy_results.compute()

    datasize = lazydata_all.size
    percent = 100.0 * result / datasize
    print(
        f"Lazy scan-and-store counted values < 0.1 :  {result}"
        f"\n  ~{percent:0.2f}% of {datasize}'"
    )

_, peak_mem = tracemalloc.get_traced_memory()
memory_mb = peak_mem * 1.0 / 1024 ** 2print("\nDone.")
print(f"Consumed memory ~ {mib:6.2f} Mb.")

Results ..

With similar data to the original case, the various above "store_op" options showed these memory usages :

Saving mode, "store_op=" Consumed memory / Mb
'all-direct' 33.71
'lazy-save' 33.70
'lazy-via-delayed' 33.70
'lazy-combined' 432.50
'lazy-scanandstore' 55.63

.. which means

wrapping the "lazy store operation" in a delayed is not in itself a problem (option 'lazy-via-delayed') , even with the additional "data" argument to the delayed function (in this case not a lazy object)

BUT passing an extra delayed argument which is a calculation on the same data does cause the problem ("lazy-combined" case) -- it seems that dask in this case cannot "see" that both operations can be performed on 1 chunk at a time So, this "'lazy-combined'" version is specifically designed to mimic what the netcdf saver is doing here -- (in that context, the purpose of the 'additional' calculation is the fill-value check ).

HOWEVER, the "lazy-scanandstore" I think shows a possible way out. The map_blocks operation now scans each section of data and stores it. So the chunks are treated separately, as they should be. So I think this is a model for how to fix the problem

pp-mo avatar Feb 16 '24 18:02 pp-mo

@bouweandela @fnattino have you seen any examples of this?

trexfeathers avatar Feb 29 '24 13:02 trexfeathers

Have you seen https://github.com/dask/dask/issues/8380 @pp-mo? I think it may be relevant here, but I'm not entirely sure yet.

bouweandela avatar Mar 05 '24 21:03 bouweandela

I suspect this issue is caused by some variation of the issue described here. Adding

    if arraylib is da:
        from dask.graph_manipulation import clone
        data = clone(data, assume_layers=True)

before this code https://github.com/SciTools/iris/blob/c6151e819189ca213b370ddf4ff38d8511c7e012/lib/iris/fileformats/netcdf/saver.py#L314 seems to avoid the high memory use by decoupling the store graph from the fill value check graph (at the cost of generating the source data twice).

bouweandela avatar Mar 06 '24 20:03 bouweandela

I suspect this issue is caused by some variation of the issue described here. ... at the cost of generating the source data twice

Well, interesting. But TBH I don't really understand what this is really doing.

FWIW I think we are very keen not to fetch data twice, if it can at all be avoided -- and if we accepted that, we could simply do fill-value checking and storage in separate steps.

Do you think this approach could possibly be modified to deliver the desired one-pass data store-and-check, @bouweandela ?

pp-mo avatar Mar 08 '24 10:03 pp-mo

Relevant issue raised on Dask repo https://github.com/dask/dask/issues/10991

pp-mo avatar Mar 11 '24 10:03 pp-mo

@pp-mo happy to close this given #5833 ?

trexfeathers avatar Apr 03 '24 15:04 trexfeathers