dask-image
dask-image copied to clipboard
The Gaussian filter is not behaving lazily as expected and is allocating RAM for the entire array
trafficstars
The test script below uses 20 GB of RAM for the full array size. Despite the small Gaussian kernel, which requires minimal chunk overlap, dask_image.ndfilters still fails to perform lazy processing. The same issue occurs across multiple operating systems (Linux Debian, Linux Ubuntu, macOS) and Python versions (3.10, 3.11), as well as different versions of dask_image.
Processing time: 167.02 seconds
Filtered result: <xarray.DataArray 'filtered_phase' (y: 50000, x: 50000)> Size: 20GB
dask.array<_trim, shape=(50000, 50000), dtype=float64, chunksize=(2048, 2048), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 400kB 0.5 1.5 2.5 3.5 4.5 ... 5e+04 5e+04 5e+04 5e+04
* x (x) float64 400kB 0.5 1.5 2.5 3.5 4.5 ... 5e+04 5e+04 5e+04 5e+04
import numpy as np
import xarray as xr
import dask
from dask_image.ndfilters import gaussian_filter as dask_gaussian_filter
from dask.distributed import Client
import os
import time
shape = (50000, 50000)
chunk_size = (2048, 2048)
netcdf_chunk_size = (2048, 2048)
netcdf_engine='netcdf4'
cutoff = 5.3
sigma_y = 5.40
sigma_x = 20.64
def run_test():
client = Client()
# prepare data
phase = xr.DataArray(
dask.array.random.random(shape, chunks=chunk_size) + 1j * dask.array.random.random(shape, chunks=chunk_size),
dims=['y', 'x'],
coords={
'y': np.arange(0.5, shape[0] + 0.5),
'x': np.arange(0.5, shape[1] + 0.5)
},
name='phase'
)
filename = 'test_gaussian_in.nc'
if os.path.exists(filename):
os.remove(filename)
encoding = {'phase': {'chunksizes': netcdf_engine}}
encoding = {
'phase_real': {'chunksizes': netcdf_chunk_size},
'phase_imag': {'chunksizes': netcdf_chunk_size}
}
ds = xr.Dataset({
'phase_real': phase.real,
'phase_imag': phase.imag
})
ds.to_netcdf(filename, engine=netcdf_engine, encoding=encoding, compute=True)
ds = xr.open_dataset(filename, engine=netcdf_engine, chunks={'y': chunk_size[0], 'x': chunk_size[1]})
phase = ds.phase_real + 1j * ds.phase_imag
# filter data
start_time = time.time()
phase_filtered = dask_gaussian_filter(phase.data.real, (sigma_y, sigma_x), mode='reflect', truncate=2)
phase_filtered = xr.DataArray(phase_filtered, dims=phase.dims, coords=phase.coords, name='phase_filtered')
filename = 'test_gaussian_out.nc'
if os.path.exists(filename):
os.remove(filename)
encoding = {'phase_filtered': {'chunksizes': netcdf_chunk_size}}
delayed = phase_filtered.to_netcdf(filename,
engine=netcdf_engine,
encoding=encoding,
compute=False)
dask.compute(delayed)
end_time = time.time()
# show output
print(f'Processing time: {end_time - start_time:.2f} seconds')
print(f'Filtered result: {phase_filtered}')
# Main block to prevent multiprocessing issues
if __name__ == '__main__':
run_test()