xESMF
xESMF copied to clipboard
regridder multi threaded dask workers
I'm trying to interpolate a section from a curvilinear grid. I'm running an acoustic transmission loss model and need to extract sections from a model at differing bearings from a point. I've written my code as an xarray accessor.
@dask.delayed
def _interp(self,ds,bearing,longitude,latitude,maxrange,rangestep,method):
qlon,qlat,ranges,b=_make_line(longitude,latitude,bearing,maxrange,rangestep)
ngrid =xr.Dataset(coords={'lat':qlat,'lon':qlon})
regridder = xe.Regridder( ds, ngrid,method,filename=str(uuid.uuid4()).replace('-','_')+'.nc')
sp = regridder(ds)
sp = sp.isel(lat=xr.DataArray(range(0,len(ranges)), dims="range", coords={"range": ranges}),
lon=xr.DataArray(range(0,len(ranges)), dims="range"))
sp['soundspeed'] = _calcsoundspeed_pt(sp.salt,
sp.temp,
sp.zc)
sp.zc.values = abs(sp.zc)
soundsl = sp.sortby(sp.zc)
soundsl = soundsl.rename({'k':'depth'})
soundsl = soundsl.set_index({'depth':'zc'})
soundsl.coords['bearing'] = bearing
soundsl.attrs['rangestep'] = rangestep
soundsl.attrs['maxrange'] = maxrange
soundsl.attrs['paramhead'] = self._obj.attrs['paramhead']
soundsl.attrs['paramfile'] = self._obj.attrs['paramfile']
soundsl.attrs['method'] = method
soundsl = soundsl.rename({'lon':'longitude', 'lat':'latitude'})
regridder.clean_weight_file()
return soundsl
def soundslice(self,longitude,latitude,bearing,maxrange,rangestep,method='bilinear'):
ds = self._obj.rename({'longitude': 'lon', 'latitude': 'lat'})
slices = [ self._interp(ds,bstep,longitude,latitude,maxrange,rangestep,method)for bstep in bearing]
return xr.concat(dask.compute(*slices),dim='bearing')
If I have a dask cluster with 1 thread per worker all is good. If I have multiple threads then the code executes first time correctly then second or third time I get:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-8-adb3f6c0b2ed> in <module>
----> 1 sslice = model_run.sel(time=slice('2018-11-01T00:00:00','2018-11-02T00:00:00'))[['temp','salt','botz']].shoc.soundslice(114.5,-31.7,[90],10000,1000)
~/roampy/roampy/shoc.py in soundslice(self, longitude, latitude, bearing, maxrange, rangestep, method)
60 ds = self._obj.rename({'longitude': 'lon', 'latitude': 'lat'})
61 slices = [ self._interp(ds,bstep,longitude,latitude,maxrange,rangestep,method)for bstep in bearing]
---> 62 return xr.concat(dask.compute(*slices),dim='bearing')
63
64
~/miniconda3/envs/dev2/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.__dask_keys__() for x in collections]
436 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2593 should_rejoin = False
2594 try:
-> 2595 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2596 finally:
2597 for f in futures.values():
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1885 else:
1886 local_worker = None
-> 1887 return self.sync(
1888 self._gather,
1889 futures,
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
777 return future
778 else:
--> 779 return sync(
780 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
781 )
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
~/miniconda3/envs/dev2/lib/python3.8/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1750 exc = CancelledError(key)
1751 else:
-> 1752 raise exception.with_traceback(traceback)
1753 raise exc
1754 if errors == "skip":
~/roampy/roampy/shoc.py in _interp()
35 qlon,qlat,ranges,b=_make_line(longitude,latitude,bearing,maxrange,rangestep)
36 ngrid =xr.Dataset(coords={'lat':qlat,'lon':qlon})
---> 37 regridder = xe.Regridder( ds, ngrid,method,filename=str(uuid.uuid4()).replace('-','_')+'.nc')
38 sp = regridder(ds)
39 sp = sp.isel(lat=xr.DataArray(range(0,len(ranges)), dims="range", coords={"range": ranges}),
~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/frontend.py in __init__()
188 self._grid_in, shape_in = ds_to_ESMFlocstream(ds_in)
189 else:
--> 190 self._grid_in, shape_in = ds_to_ESMFgrid(ds_in,
191 need_bounds=self.need_bounds,
192 periodic=periodic
~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/frontend.py in ds_to_ESMFgrid()
62
63 # tranpose the arrays so they become Fortran-ordered
---> 64 grid = esmf_grid(lon.T, lat.T, periodic=periodic)
65
66 if need_bounds:
~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/backend.py in esmf_grid()
99 # However, they actually need to be set explicitly,
100 # otherwise grid._coord_sys and grid._staggerloc will still be None.
--> 101 grid = ESMF.Grid(np.array(lon.shape), staggerloc=staggerloc,
102 coord_sys=ESMF.CoordSys.SPH_DEG,
103 num_peri_dims=num_peri_dims)
~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/util/decorators.py in new_func()
62
63 esmp = esmpymanager.Manager(debug = False)
---> 64 return func(*args, **kwargs)
65 return new_func
66
~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/api/grid.py in __init__()
383 self._struct = ESMP_GridStruct()
384 if self.num_peri_dims == 0:
--> 385 self._struct = ESMP_GridCreateNoPeriDim(self.max_index,
386 coordSys=coord_sys,
387 coordTypeKind=coord_typekind)
~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/interface/cbindings.py in ESMP_GridCreateNoPeriDim()
558 rc = lrc.value
559 if rc != constants._ESMP_SUCCESS:
--> 560 raise ValueError('ESMC_GridCreateNoPeriDim() failed with rc = '+str(rc)+
561 '. '+constants._errmsg)
562
ValueError: ESMC_GridCreateNoPeriDim() failed with rc = 545. Please check the log files (named "*ESMF_LogFile").
I known this is probably not the usual use case for this package but it works nicely. Any thoughts on how to just interplate along a line would also be greatly received.
I've run into similar errors using xesmf in parallel applications, and it has usually been an issue with the regridder files being stored on disk. You might solve this by keeping the weight file and setting reuse_weights to True.
Any updates on the new API that makes use of in-memory storage for regridder weights?
I'd be interested in the in-memory storage for regridder weights too, as I'm running into the 545 error as well. My use case is regridding ~50 CMIP climate model fields that exist on all different grids to a common grid. Was trying to use dask.delayed to set up the ~50 calls to a regridding function (that creates an xesmf regridder and regrids) and dask.compute to then do all the regridding but it's a no go.
I tested with:
def my_regrid_func( xarraydataset, dst_grid ) regridder=xe.regridder( xarraydataset, dst_grid, 'bilinear' ) var_regrid=regridder( xarraydataset[ 'var' ] ) outDS=xr.Dataset( { var : var_regrid } ) outDS.to_netcdf( path )
result1=dask.delayed( my_regrid_func )( xarraydataset1, dst_grid ) dask.compute( result1 )
result2=dask.delayed( my_regrid_func )( xarraydataset2, dst_grid ) dask.compute( result2 )
If there's a solution for embarrassingly parallel xesmf/dask regridding that I haven't found yet, please advise thanks
Check out the new pangeo-data fork of this package, which has a number of updates including in-memory weights storage.
Same problem here. I am using xarray to read very large files from model output. I created a custom generator to read one time step at a time, regrid and feed it to a NN model. First time around works fine, second time I get this error:
File "/discover/nobackup/dbarahon/bin/conda/envs/tf_keras/lib/python3.8/site-packages/ESMF/interface/cbindings.py", line 528, in ESMP_GridCreate1PeriDim raise ValueError('ESMC_GridCreate() failed with rc = '+str(rc)+'. '+
These are the relevant parts of the code:
def train_gen(dts_per_epoch):
st = 0
epoch = 1
while True:
if st < 1:
X, y, blcks = get_dts(print_files = True)
st += 1
b = np.random.randint(0, blcks-1, 1) # select a random block of size batch size
print ('_block', b, '')
xb = Xall.data.blocks[b]
xb1 = xb.compute()
xb2 = np.asarray(xb1)
#print (xb2.shape)
yb = yall.data.blocks[b]
yb1 = yb.compute()
yb2 = np.asarray(yb1)
# print (yb2.shape)
yield( xb2, yb2)
xb2 = []
yb2 = []
print('st',st)
if st > dts_per_epoch:
st = 0
def get_dts():
....
lat= dat_in['lat'].values
lon = dat_in['lon'].values
grid_out = xr.Dataset({'lat': (['lat'], lat ),
'lon': (['lon'], lon),
})
regridder = xe.Regridder(Wc, grid_out , 'bilinear', filename='regrid.nc', periodic=True, reuse_weights=True)
y = regridder(Wc)
...
I've run into similar errors using xesmf in parallel applications, and it has usually been an issue with the regridder files being stored on disk. You might solve this by keeping the weight file and setting reuse_weights to True.
Set reuse_weights = True
do not solve this problem completely , it also raises same error sometimes.
Also i install pangeo-data fork
and this problem still exist.
This problem really bother me, cause both of dask.distributed
cluster and xesmf
are all effective tools i want use.