rioxarray
rioxarray copied to clipboard
speeding up windowed writes
Ive been writing COGs using the windowed option
da.rio.to_raster(...... driver="COG",windowed=True)
It works fine but is rather slow.
my array is on a set of distributed dask workers that dont share a file system so as far is i can tell using a lock isnt an option.
It looks to me that it is slow because of the windows being written are very small as they match the cog internal tiling.
I did a bit of testing and for instance if you use extra options such as:
tiled=True,
driver="GTiff",
blockxsize = 10000,
blockysize = 10000,
the writing is much faster and approaching the speed of the non windowed write and makes better use of the available ram and CPU for compression. Obviously its no longer a COG though.
However if i change the raster_writer code with the following I get a COG and I get fast writes and as far as ive tested it works for arbitrarily large arrays.
def to_raster(self, xarray_dataarray, tags, windowed, lock, compute, **kwargs):
"""
This method writes to the raster on disk.
xarray_dataarray: xarray.DataArray
The input data array to write to disk.
tags: dict, optional
A dictionary of tags to write to the raster.
windowed: bool
If True and the data array is not a dask array, it will write
the data to disk using rasterio windows.
lock: boolean or Lock, optional
Lock to use to write data using dask.
If not supplied, it will use a single process.
compute: bool
If True (default) and data is a dask array, then compute and save
the data immediately. If False, return a dask Delayed object.
Call ".compute()" on the Delayed object to compute the result
later. Call ``dask.compute(delayed1, delayed2)`` to save
multiple delayed files at once.
dtype: np.dtype
Numpy-compliant dtype used to save raster. If data is not already
represented by this dtype in memory it is recast. dtype='complex_int16'
is a special case to write in-memory np.complex64 to CInt16.
**kwargs
Keyword arguments to pass into writing the raster.
"""
kwargs["dtype"], numpy_dtype = _get_dtypes(
kwargs["dtype"],
xarray_dataarray.encoding.get("rasterio_dtype"),
xarray_dataarray.encoding.get("dtype", str(xarray_dataarray.dtype)),
)
if kwargs["nodata"] is not None:
# Ensure dtype of output data matches the expected dtype.
# This check is added here as the dtype of the data is
# converted right before writing.
kwargs["nodata"] = _ensure_nodata_dtype(kwargs["nodata"], numpy_dtype)
with rasterio.open(self.raster_path, "w", **kwargs) as rds:
_write_metatata_to_raster(rds, xarray_dataarray, tags)
if not (lock and is_dask_collection(xarray_dataarray.data)):
# write data to raster immmediately if not dask array
if windowed:
#window_iter = rds.block_windows(1)
#a bit of a hack to get block windows that are much larger then the cog block windows
#I wonder how best to align these.
#maybe to some multiple of the cog blocks (is there an order that is better to write cog blocks?)
#maybe align to the dask blocks like is done if using a lock
#to look at : could the encode_cf_variable step occur on the dask workers
temp_kwargs = kwargs.copy()
temp_kwargs['driver'] = 'GTiff'
temp_kwargs['tiled'] = True
temp_kwargs['blockxsize'] = 512 * 20
temp_kwargs['blockysize'] = 512 * 20
with rasterio.io.MemoryFile() as memfile:
with memfile.open(**temp_kwargs) as temp_rds:
window_iter = list(temp_rds.block_windows(1))
else:
window_iter = [(None, None)]
for _, window in window_iter:
if window is not None:
out_data = xarray_dataarray.rio.isel_window(window)
else:
out_data = xarray_dataarray
data = encode_cf_variable(out_data).values.astype(numpy_dtype)
if data.ndim == 2:
rds.write(data, 1, window=window)
else:
rds.write(data, window=window)
if lock and is_dask_collection(xarray_dataarray.data):
return dask.array.store(
encode_cf_variable(xarray_dataarray).data.astype(numpy_dtype),
self,
lock=lock,
compute=compute,
)
return None
Basically im decoupling the block_windows used for writing from the internal block_windows of the COG. Obviously done in a slightly hacky way and it would need to be refined so that the windows are suitable for the ram available on the system.
I think if refined enough there is no reason why not to make this the standard way of writing (no need for a windowed=False option, It would also be interesting to see how it performs compared to using a lock
Thanks
This is definitely an interesting use case. I am curious why you need to use windowed=True. Does the default method use too much memory?
I think in your scenario, being able to have a lock on a per machine basis would be your best solution. That way each machine can write to the file on its drive safely. Not sure if such a lock exists though.
Well im still testing but windowed=False kills my cluster once the array gets large andwindowed =True does not cause an issue on the cluster.
I was premature to say it could do arbitrarily large arrays though. Unfortunately the gdal cog driver seems to ignore GDAL_CACHEMAX and crash when it runs out of memory.
On the other hand the gtiff driver works fine with large arrays and with 512x512 internal blocks its not hard to turn it into a cog after that. I just wrote a compressed 47GB file on a machine with only 32GB of ram without the issues that the cog driver had.
With the modifications in the OP windowed writes seem to be faster then non windowed however i find it not easy with dask to isolate out all the things going on in the background to know if this is real. Its definitely a big gain on the current windowed writes.
Ive got to admit ive not looked at the lock thing much as I cant see how it can help me if the workers cant all see the same file unless im writing a whole collection of separate files in which case i would probably look at using zarr in an s3 bucket instead.
Thanks for the update, your reasoning makes sense.
I could be wrong, but don't think that the COG driver works for windowed #410.
For writing multiple files, #433 is relevant. Also, as you said, Zarr is a good choice for that.
with #410 that issue is because when writing with a lock the file is written in windows in separate sessions for each chunk on this line https://github.com/corteva/rioxarray/blob/199c05143bb9a68b42090bb7143121214b517626/rioxarray/raster_writer.py#L195 im guessing the cog overview creation is only triggered when the file is first populated not on subsequent modification when opened as r+ I dont see the issue that was shown in #410 when doing windowed writes in a single thread to a cog.
im guessing the cog overview creation is only triggered when the file is first populated not on subsequent modification when opened as r+
https://gdal.org/drivers/raster/cog.html
I am wondering if it works with dask if you use: OVERVIEWS=IGNORE_EXISTING?
Ive been exploring some refinements im now using the xarray chunks as the window for writing. this seems to be working well.
if windowed:
client = get_client()
ac = as_completed(client.compute(xarray_dataarray.data.to_delayed().ravel().tolist()), with_results=True)
offsets = [np.cumsum(c) for c in xarray_dataarray.chunks]
#todo: deal with data with more then 2 dimensions
#see here for example https://github.com/corteva/rioxarray/blob/199c05143bb9a68b42090bb7143121214b517626/rioxarray/raster_writer.py#L196
for future,data in ac:
height = xarray_dataarray.chunks[0][future.key[1]]
width = xarray_dataarray.chunks[1][future.key[2]]
row_off = offsets[0][future.key[1]] - height
col_off = offsets[1][future.key[2]] - width
window = Window(col_off, row_off, width, height)
if data.ndim == 2:
rds.write(data, 1, window=window)
else:
rds.write(data, window=window)
else:
out_data = xarray_dataarray
data = encode_cf_variable(out_data).values.astype(numpy_dtype)
if data.ndim == 2:
rds.write(data, 1)
else:
rds.write(data)
That makes sense to me as a solution for writing dask chunks to disk when processing using multiple machines that don't share the same file system. I think this would be a good solution for the mechanism for writing to a raster using dask when a lock isn't provided.