opening a zarr dataset taking so much time with dask
What is your issue?
I have an era5 dataset stored in GCS bucket as zarr. It contains 273 weather related variables and 4 dimensions. It's an hourly stored data from 1940 to 2023.
When I try to open with ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3") it takes 90 seconds to actually finish the open call.
The chunk scheme is { 'time': 1 }.
Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!
I've felt the pain on this particular store as well, it's a nice test case. 3PB total, ~300,000,000 chunks in total.
Looks like this is a dask problem though. All the time is spent in single-threaded code creating the array chunks.
If we skip dask with xr.open_zarr(..., chunks=None) it takes 1.5s.
We currently have a drop_variables arg. When you have a dataset with 273 variables and you only want a couple, the inverse keep_variables would be a lot easier. It looks like drop_variables gets applied before we create the dask chunks for the arrays, so reading the store once and on the second read adding drop_variables=[v for v in ds.data_vars if v != "geopotential"], I recover a ~1.5s read time.
Nice profile!
@jrbourbeau https://github.com/dask/dask/pull/10648 probably improves this by a lot. Can that be reviewed/merged please?
edit: xref https://github.com/dask/dask/pull/10269
Interestingly things are almost a factor of 4x worse on both those PRs, but both are out of date so may be missing other recent improvements.
@slevang, do you mind sharing how you are generating these profiles and associated graphs? I've struggled in the past to do this effectively with a dask cluster. This looks great!
This is snakeviz: https://jiffyclub.github.io/snakeviz/
^yep. Probably not useful for distributed profiling but I haven't really tried. It's just a visualization layer for cProfile.
In this case the creation of this monster task graph would be happening serially in the main process even if your goal was to eventually use a distributed client to run processing. The graph (only describing the array chunks) is close to a billion objects in this case, so would run into issues even trying to serialize that out to workers.
FYI something like:
ds = xr.open_zarr(
"gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
chunks=dict(time=-1, level=-1, latitude="auto", longitude="auto"),
)
...may give a better balance between dask task graph size and chuck size.
(I do this "make the dask chunks bigger than the zarr chunks" a lot, because simple dask graphs can become huge with a 50TB dataset, since the default zarr encoding has a maximum chunk size of 2GB. Not sure it's necessarily the best way, very open to ideas...)
Edit: nevermind I actually have no idea where this profile came from. Disregard
How do I get my work done?
- I opened the dataset with
chunks=None. - Then filter that as required like select only a few data variables only for some fixed time ranges and for some fixed
latlonranges. - Then chunk that small dataset only.
ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", chunks=None)
data_vars = ['surface_pressure', 'temperature']
ds = ds[data_vars]
ds = ds.sel(time=slice('2013-01-01', '2013-12-31'))
... and a few more filters ...
ds = ds.chunk()
This will only generate chunks for a filtered dataset. These steps worked for me so closing the issue.
This will only generate chunks for a filtered dataset.
This should absolutely be the way that xarray works! There is no need to create chunks for variables that never are accessed.
So this is running 130 seconds on my machine, 2 things take up 80 seconds of that:
- tokenising the chunks (yikes...)
- this warning (yikes too, 25-30 seconds)
I have a PR here that will cut 25 seconds of the runtime, the other parts are a bit trickier probably
cc @dcherian
Yup saving 30s with #9808 . The cache is quite effective: CacheInfo(hits=826, misses=4, maxsize=None, currsize=4)
Belatedly realizing that Xarray's call to normalize_chunks is a major time waster here given that chunks contains a tuple with O(1 million) elements hehe.
I wonder if we can close this now that we've demonstrated some substantial improvements here:
- Blog post: https://earthmover.io/blog/xarray-open-zarr-improvements
- https://github.com/zarr-developers/zarr-python/pull/2519
- https://github.com/pydata/xarray/pull/9861
This repository has consolidated metadata, so this ticket has always been about dask, and specifically how Xarray calls tokenize, and how Xarray checks for chunk alignment.
Just went down a bit of a rabbit hole and wanted to just mention that to this point: https://github.com/pydata/xarray/issues/8902#issuecomment-2035360422 picking an explicit chunks value will speed things up dramatically. For instance to make the dask tasks 10x the on-disk chunksize (which is 1) in the time dimension:
%%time
ds = xr.open_zarr(
"gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
chunks=dict(time=10, level=-1, latitude=(), longitude=()),
)
This is partly an Xarray issue with us passing a normalized chunks tuple that is very very large. tokenizing that takes ages: https://github.com/pydata/xarray/pull/9897
One thought for anyone interested, we might skip "normalizing" chunks for int chunks (like all Zarrs & netCDFs in existence today) and pass them straight through; dask can handle them and it gets us out of this mess of tokenizing a tuple with half a million 1s. I may have looked in to this, but can't remember why I didn't do it.
I am kind of wondering if we can actually skip "normalizing" chunks always for Dask. Dask is going to to normalize anyways on its side I imagine.
I can't remember why but it's to do with handling the "auto" I think?
~Zooming out a bit, I am wondering if a HighLevelGraph approach might let us not materialize the low level task graph at all for this kind of many data variables case. Especially given that you likely only want a few of the data variables.~
I realized that this is actually just setting the key for the HighLevelGraph so that didn't really make any sense.