dask-image icon indicating copy to clipboard operation
dask-image copied to clipboard

The Gaussian filter is not behaving lazily as expected and is allocating RAM for the entire array

Open AlexeyPechnikov opened this issue 1 year ago • 18 comments
trafficstars

The test script below uses 20 GB of RAM for the full array size. Despite the small Gaussian kernel, which requires minimal chunk overlap, dask_image.ndfilters still fails to perform lazy processing. The same issue occurs across multiple operating systems (Linux Debian, Linux Ubuntu, macOS) and Python versions (3.10, 3.11), as well as different versions of dask_image.

Processing time: 167.02 seconds
Filtered result: <xarray.DataArray 'filtered_phase' (y: 50000, x: 50000)> Size: 20GB
dask.array<_trim, shape=(50000, 50000), dtype=float64, chunksize=(2048, 2048), chunktype=numpy.ndarray>
Coordinates:
  * y        (y) float64 400kB 0.5 1.5 2.5 3.5 4.5 ... 5e+04 5e+04 5e+04 5e+04
  * x        (x) float64 400kB 0.5 1.5 2.5 3.5 4.5 ... 5e+04 5e+04 5e+04 5e+04
import numpy as np
import xarray as xr
import dask
from dask_image.ndfilters import gaussian_filter as dask_gaussian_filter
from dask.distributed import Client
import os
import time

shape = (50000, 50000)
chunk_size = (2048, 2048)
netcdf_chunk_size = (2048, 2048)
netcdf_engine='netcdf4'

cutoff = 5.3
sigma_y = 5.40
sigma_x = 20.64

def run_test():
    client = Client()
    # prepare data
    phase = xr.DataArray(
        dask.array.random.random(shape, chunks=chunk_size) + 1j * dask.array.random.random(shape, chunks=chunk_size),
        dims=['y', 'x'],
        coords={
            'y': np.arange(0.5, shape[0] + 0.5),
            'x': np.arange(0.5, shape[1] + 0.5)
        },
        name='phase'
    )
    filename = 'test_gaussian_in.nc'
    if os.path.exists(filename):
        os.remove(filename)
    encoding = {'phase': {'chunksizes': netcdf_engine}}
    encoding = {
        'phase_real': {'chunksizes': netcdf_chunk_size},
        'phase_imag': {'chunksizes': netcdf_chunk_size}
    }
    ds = xr.Dataset({
        'phase_real': phase.real,
        'phase_imag': phase.imag
    })    
    ds.to_netcdf(filename, engine=netcdf_engine, encoding=encoding, compute=True)
    ds = xr.open_dataset(filename, engine=netcdf_engine, chunks={'y': chunk_size[0], 'x': chunk_size[1]})
    phase = ds.phase_real + 1j * ds.phase_imag
    # filter data
    start_time = time.time()
    phase_filtered = dask_gaussian_filter(phase.data.real, (sigma_y, sigma_x), mode='reflect', truncate=2)
    phase_filtered = xr.DataArray(phase_filtered, dims=phase.dims, coords=phase.coords, name='phase_filtered')    
    filename = 'test_gaussian_out.nc'
    if os.path.exists(filename):
        os.remove(filename)
    encoding = {'phase_filtered': {'chunksizes': netcdf_chunk_size}}
    delayed = phase_filtered.to_netcdf(filename,
                                       engine=netcdf_engine,
                                       encoding=encoding,
                                       compute=False)
    dask.compute(delayed)
    end_time = time.time()
    # show output
    print(f'Processing time: {end_time - start_time:.2f} seconds')
    print(f'Filtered result: {phase_filtered}')

# Main block to prevent multiprocessing issues
if __name__ == '__main__':
    run_test()

AlexeyPechnikov avatar Aug 22 '24 05:08 AlexeyPechnikov