xarray icon indicating copy to clipboard operation
xarray copied to clipboard

opening a zarr dataset taking so much time with dask

Open DarshanSP19 opened this issue 1 year ago • 22 comments

What is your issue?

I have an era5 dataset stored in GCS bucket as zarr. It contains 273 weather related variables and 4 dimensions. It's an hourly stored data from 1940 to 2023. When I try to open with ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3") it takes 90 seconds to actually finish the open call. The chunk scheme is { 'time': 1 }.

DarshanSP19 avatar Apr 02 '24 13:04 DarshanSP19

Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!

welcome[bot] avatar Apr 02 '24 13:04 welcome[bot]

I've felt the pain on this particular store as well, it's a nice test case. 3PB total, ~300,000,000 chunks in total.

Looks like this is a dask problem though. All the time is spent in single-threaded code creating the array chunks.

Screenshot from 2024-04-02 14-34-43

If we skip dask with xr.open_zarr(..., chunks=None) it takes 1.5s.

We currently have a drop_variables arg. When you have a dataset with 273 variables and you only want a couple, the inverse keep_variables would be a lot easier. It looks like drop_variables gets applied before we create the dask chunks for the arrays, so reading the store once and on the second read adding drop_variables=[v for v in ds.data_vars if v != "geopotential"], I recover a ~1.5s read time.

slevang avatar Apr 02 '24 18:04 slevang

Nice profile!

@jrbourbeau https://github.com/dask/dask/pull/10648 probably improves this by a lot. Can that be reviewed/merged please?

edit: xref https://github.com/dask/dask/pull/10269

dcherian avatar Apr 02 '24 18:04 dcherian

Interestingly things are almost a factor of 4x worse on both those PRs, but both are out of date so may be missing other recent improvements.

Screenshot from 2024-04-02 15-13-21

slevang avatar Apr 02 '24 19:04 slevang

@slevang, do you mind sharing how you are generating these profiles and associated graphs? I've struggled in the past to do this effectively with a dask cluster. This looks great!

riley-brady avatar Apr 03 '24 18:04 riley-brady

This is snakeviz: https://jiffyclub.github.io/snakeviz/

dcherian avatar Apr 03 '24 18:04 dcherian

^yep. Probably not useful for distributed profiling but I haven't really tried. It's just a visualization layer for cProfile.

In this case the creation of this monster task graph would be happening serially in the main process even if your goal was to eventually use a distributed client to run processing. The graph (only describing the array chunks) is close to a billion objects in this case, so would run into issues even trying to serialize that out to workers.

slevang avatar Apr 03 '24 18:04 slevang

FYI something like:

ds = xr.open_zarr(
    "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
    chunks=dict(time=-1, level=-1, latitude="auto", longitude="auto"),
)

...may give a better balance between dask task graph size and chuck size.

(I do this "make the dask chunks bigger than the zarr chunks" a lot, because simple dask graphs can become huge with a 50TB dataset, since the default zarr encoding has a maximum chunk size of 2GB. Not sure it's necessarily the best way, very open to ideas...)

max-sixty avatar Apr 03 '24 18:04 max-sixty

Edit: nevermind I actually have no idea where this profile came from. Disregard

slevang avatar Apr 03 '24 19:04 slevang

How do I get my work done?

  • I opened the dataset with chunks=None.
  • Then filter that as required like select only a few data variables only for some fixed time ranges and for some fixed lat lon ranges.
  • Then chunk that small dataset only.
ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", chunks=None)
data_vars = ['surface_pressure', 'temperature']
ds = ds[data_vars]
ds = ds.sel(time=slice('2013-01-01', '2013-12-31'))
... and a few more filters ...
ds = ds.chunk() 

This will only generate chunks for a filtered dataset. These steps worked for me so closing the issue.

DarshanSP19 avatar Apr 04 '24 07:04 DarshanSP19

This will only generate chunks for a filtered dataset.

This should absolutely be the way that xarray works! There is no need to create chunks for variables that never are accessed.

martindurant avatar Nov 20 '24 19:11 martindurant

So this is running 130 seconds on my machine, 2 things take up 80 seconds of that:

  • tokenising the chunks (yikes...)
  • this warning (yikes too, 25-30 seconds)

I have a PR here that will cut 25 seconds of the runtime, the other parts are a bit trickier probably

cc @dcherian

phofl avatar Nov 21 '24 09:11 phofl

Yup saving 30s with #9808 . The cache is quite effective: CacheInfo(hits=826, misses=4, maxsize=None, currsize=4)

dcherian avatar Nov 21 '24 18:11 dcherian

Belatedly realizing that Xarray's call to normalize_chunks is a major time waster here given that chunks contains a tuple with O(1 million) elements hehe.

dcherian avatar Dec 16 '24 16:12 dcherian

I wonder if we can close this now that we've demonstrated some substantial improvements here:

  • Blog post: https://earthmover.io/blog/xarray-open-zarr-improvements
  • https://github.com/zarr-developers/zarr-python/pull/2519
  • https://github.com/pydata/xarray/pull/9861

jhamman avatar Mar 20 '25 06:03 jhamman

This repository has consolidated metadata, so this ticket has always been about dask, and specifically how Xarray calls tokenize, and how Xarray checks for chunk alignment.

dcherian avatar Mar 20 '25 14:03 dcherian

Just went down a bit of a rabbit hole and wanted to just mention that to this point: https://github.com/pydata/xarray/issues/8902#issuecomment-2035360422 picking an explicit chunks value will speed things up dramatically. For instance to make the dask tasks 10x the on-disk chunksize (which is 1) in the time dimension:

%%time
ds = xr.open_zarr(
    "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
    chunks=dict(time=10, level=-1, latitude=(), longitude=()),
)

jsignell avatar Dec 10 '25 20:12 jsignell

This is partly an Xarray issue with us passing a normalized chunks tuple that is very very large. tokenizing that takes ages: https://github.com/pydata/xarray/pull/9897

dcherian avatar Dec 10 '25 20:12 dcherian

One thought for anyone interested, we might skip "normalizing" chunks for int chunks (like all Zarrs & netCDFs in existence today) and pass them straight through; dask can handle them and it gets us out of this mess of tokenizing a tuple with half a million 1s. I may have looked in to this, but can't remember why I didn't do it.

dcherian avatar Dec 11 '25 15:12 dcherian

I am kind of wondering if we can actually skip "normalizing" chunks always for Dask. Dask is going to to normalize anyways on its side I imagine.

jsignell avatar Dec 11 '25 23:12 jsignell

I can't remember why but it's to do with handling the "auto" I think?

dcherian avatar Dec 11 '25 23:12 dcherian

~Zooming out a bit, I am wondering if a HighLevelGraph approach might let us not materialize the low level task graph at all for this kind of many data variables case. Especially given that you likely only want a few of the data variables.~

I realized that this is actually just setting the key for the HighLevelGraph so that didn't really make any sense.

jsignell avatar Dec 12 '25 15:12 jsignell