xarray
xarray copied to clipboard
Integrate cubed in xarray
Initial attempt to get cubed working within xarray, as an alternative to dask.
- [x] Closes #6807, at least for the case of cubed
- [ ] Tests added
- [ ] User visible changes (including notable bug fixes) are documented in
whats-new.rst - [ ] New functions/methods are listed in
api.rst
I've added a manager kwarg to the .chunk methods so you can do da.chunk(manager="cubed") to convert to a chunked cubed.CoreArray, with the default still being da.chunk(manager="dask"). (I couldn't think of a better name than "manager", as "backend" and "executor" are already taken.)
~~At the moment it should work except for an import error that I don't understand, see below.~~
To complete this PR we would also need:
- [x] Cubed to expose the correct array type consistently https://github.com/tomwhite/cubed/issues/123
- [ ] A cubed version of
apply_gufunchttps://github.com/tomwhite/cubed/pull/119 - [ ] Re-route
xarray.apply_ufuncthroughcubed.apply_gufuncinstead of dask'sapply_gufuncwhen appropriate - [ ] A test suite for wrapping cubed arrays, which would be best done via #6894
- [ ] Ideally also generalise
xarray.map_blocksto work on cubed arrays
cc @tomwhite
I think the manager keyword will also need adding to open_zarr, open_dataset and to_zarr.
I'm interested in trying this out on some of our genomics use cases in sgkit (see https://github.com/pystatgen/sgkit/issues/908), so please let me know when you think it's ready to try @TomNicholas.
This was more abstract than expected
Yeah I was kind of asking whether this was unnecessarily abstract, and if there was a simpler design that achieved the same flexibility.
@TomNicholas it might be good to rebase this now that #7067 is in.
@drtodd13 tagging you here and linking my notes from today's distributed arrays working group meeting for the links and references to this PR.
@drtodd13 mentioned today that ramba doesn't actually require explicit chunks to work, which I hadn't realised. So forcing wrapped libraries to implement an explicit chunks method might be too restrictive. Ramba could possibly work entirely through the numpy array API standard.
ramba doesn't actually require explicit chunks to work
I suspect Arkouda is similar in that this is not a detail the user is expected to worry about. (cc @sdbachman)
I'm making progress with this PR, and now that @tomwhite implemented cubed.apply_gufunc I've re-routed xarray.apply_ufunc to use whatever version of apply_gufunc is defined by the chosen ChunkManager. This means many basic operations should now just work:
In [1]: import xarray as xr
In [2]: da = xr.DataArray([1, 2, 3], dims='x')
In [3]: da_chunked = da.chunk(from_array_kwargs={'manager': 'cubed'})
In [4]: da_chunked
Out[4]:
<xarray.DataArray (x: 3)>
cubed.Array<array-003, shape=(3,), dtype=int64, chunks=((3,),)>
Dimensions without coordinates: x
In [5]: da_chunked.mean()
Out[5]:
<xarray.DataArray ()>
cubed.Array<array-006, shape=(), dtype=int64, chunks=()>
In [6]: da_chunked.mean().compute()
[cubed.Array<array-009, shape=(), dtype=int64, chunks=()>]
Out[6]:
<xarray.DataArray ()>
array(2)
(You need to install both cubed>0.5.0 and the main branch of rechunker for this to work.)
I still have a fair bit more to do on this PR (see checklist at top), but for testing should I:
- Start making a
test_cubed.pyfile in xarray as part of this PR with bespoke tests, - Put bespoke tests for xarray wrapping cubed somewhere else (e.g. the cubed repo or a new
cubed-xarrayrepo), - Merge this PR without cubed-specific tests and concentrate on finishing the general duck-array testing framework in #6908 so we can implement (b) in the way we actually eventually want things to work for 3rd-party duck array libraries?
I would prefer not to have this PR grow to be thousands of lines by including tests in it, but also waiting for #6908 might take a while because that's also a fairly ambitious PR.
The fact that the tests are currently green for this PR (ignoring some mypy stuff) is evidence that the decoupling of dask from xarray is working so far.
(I have already added some tests for the ability to register custom ChunkManagers though.)
Great work @TomNicholas!
I don't have a strong opinion about the tests, but putting them in a new project to keep xarray changes to a minimum is probably a good idea for the moment.
Thanks @tomwhite - I think it might make sense for me to remove the CubedManager class from this PR and instead put that & cubed+xarray tests into another repo. That keeps xarray's changes minimal, doesn't require putting cubed in any xarray CI envs, and hopefully allows us to merge the ChunkManager changes here earlier.
Places dask is still explicitly imported in xarray
There are a few remaining places where I haven't generalised to remove specific import dask calls either because it won't be imported at runtime unless you ask for it, cubed doesn't implement the equivalent function, that function isn't in the array API standard, or because I'm not sure if the dask concept used generalises to other parallel frameworks.
- [ ]
open_mfdataset(..., parallel=True)- there is nocubed.delayedto wrap theopen_datasetcalls in, - [ ]
Dataset.__dask_graph__and all the other similar dask magic methods - [ ]
dask_array_ops.rolling- uses functions fromdask.array.overlap, - [ ]
dask_array_ops.least_squares- usesdask.array.apply_along_axisanddask.array.linalg.lstsq, - [ ]
dask_array_ops.push- usesdask.array.reductions.cumreduction
I would like to get to the point where you can use xarray with a chunked array without ever importing dask. I think this PR gets very close, but that would be tricky to test because cubed depends on dask (so I can't just run the test suite without dask in the environment), and there are not yet any other parallel chunk-aware frameworks I know of (ramba and arkouda don't have a chunks attribute so wouldn't require this PR).
I tried opening a zarr store into xarray with chunking via cubed, but I got an error inside the indexing adapter classes. Somehow the type is completely wrong - would be good to type hint this part of the code, because this happens despite mypy passing now.
# create example zarr store
orig = xr.tutorial.open_dataset("air_temperature")
orig.to_zarr('air2.zarr')
# open it as a cubed array
ds = xr.open_dataset('air2.zarr', engine='zarr', chunks={}, from_array_kwargs={'manager': 'cubed'})
# fails at this point
ds.load()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/tenacity/__init__.py:382, in Retrying.__call__(self, fn, *args, **kwargs)
381 try:
--> 382 result = fn(*args, **kwargs)
383 except BaseException: # noqa: B902
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/runtime/executors/python.py:10, in exec_stage_func(func, *args, **kwargs)
8 @retry(stop=stop_after_attempt(3))
9 def exec_stage_func(func, *args, **kwargs):
---> 10 return func(*args, **kwargs)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/primitive/blockwise.py:66, in apply_blockwise(out_key, config)
64 args.append(arg)
---> 66 result = config.function(*args)
67 if isinstance(result, dict): # structured array with named fields
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/core/ops.py:439, in map_blocks.<locals>.func_with_block_id.<locals>.wrap(*a, **kw)
438 block_id = offset_to_block_id(a[-1].item())
--> 439 return func(*a[:-1], block_id=block_id, **kw)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/core/ops.py:572, in map_direct.<locals>.new_func.<locals>.wrap(block_id, *a, **kw)
571 args = a + arrays
--> 572 return func(*args, block_id=block_id, **kw)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/core/ops.py:76, in _from_array(e, x, outchunks, asarray, block_id)
75 def _from_array(e, x, outchunks=None, asarray=None, block_id=None):
---> 76 out = x[get_item(outchunks, block_id)]
77 if asarray:
File ~/Documents/Work/Code/xarray/xarray/core/indexing.py:627, in CopyOnWriteArray.__getitem__(self, key)
626 def __getitem__(self, key):
--> 627 return type(self)(_wrap_numpy_scalars(self.array[key]))
File ~/Documents/Work/Code/xarray/xarray/core/indexing.py:534, in LazilyIndexedArray.__getitem__(self, indexer)
533 return array[indexer]
--> 534 return type(self)(self.array, self._updated_key(indexer))
File ~/Documents/Work/Code/xarray/xarray/core/indexing.py:500, in LazilyIndexedArray._updated_key(self, new_key)
499 def _updated_key(self, new_key):
--> 500 iter_new_key = iter(expanded_indexer(new_key.tuple, self.ndim))
501 full_key = []
AttributeError: 'tuple' object has no attribute 'tuple'
The above exception was the direct cause of the following exception:
RetryError Traceback (most recent call last)
Cell In[69], line 1
----> 1 ds.load()
File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:761, in Dataset.load(self, **kwargs)
758 chunkmanager = get_chunked_array_type(*lazy_data.values())
760 # evaluate all the chunked arrays simultaneously
--> 761 evaluated_data = chunkmanager.compute(*lazy_data.values(), **kwargs)
763 for k, data in zip(lazy_data, evaluated_data):
764 self.variables[k].data = data
File ~/Documents/Work/Code/xarray/xarray/core/parallelcompat.py:451, in CubedManager.compute(self, *data, **kwargs)
448 def compute(self, *data: "CubedArray", **kwargs) -> np.ndarray:
449 from cubed import compute
--> 451 return compute(*data, **kwargs)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/core/array.py:300, in compute(executor, callbacks, optimize_graph, *arrays, **kwargs)
297 executor = PythonDagExecutor()
299 _return_in_memory_array = kwargs.pop("_return_in_memory_array", True)
--> 300 plan.execute(
301 executor=executor,
302 callbacks=callbacks,
303 optimize_graph=optimize_graph,
304 array_names=[a.name for a in arrays],
305 **kwargs,
306 )
308 if _return_in_memory_array:
309 return tuple(a._read_stored() for a in arrays)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/core/plan.py:154, in Plan.execute(self, executor, callbacks, optimize_graph, array_names, **kwargs)
152 if callbacks is not None:
153 [callback.on_compute_start(dag) for callback in callbacks]
--> 154 executor.execute_dag(
155 dag, callbacks=callbacks, array_names=array_names, **kwargs
156 )
157 if callbacks is not None:
158 [callback.on_compute_end(dag) for callback in callbacks]
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/cubed/runtime/executors/python.py:22, in PythonDagExecutor.execute_dag(self, dag, callbacks, array_names, **kwargs)
20 if stage.mappable is not None:
21 for m in stage.mappable:
---> 22 exec_stage_func(stage.function, m, config=pipeline.config)
23 if callbacks is not None:
24 event = TaskEndEvent(array_name=name)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/tenacity/__init__.py:289, in BaseRetrying.wraps.<locals>.wrapped_f(*args, **kw)
287 @functools.wraps(f)
288 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
--> 289 return self(f, *args, **kw)
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/tenacity/__init__.py:379, in Retrying.__call__(self, fn, *args, **kwargs)
377 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
378 while True:
--> 379 do = self.iter(retry_state=retry_state)
380 if isinstance(do, DoAttempt):
381 try:
File ~/miniconda3/envs/cubed/lib/python3.9/site-packages/tenacity/__init__.py:326, in BaseRetrying.iter(self, retry_state)
324 if self.reraise:
325 raise retry_exc.reraise()
--> 326 raise retry_exc from fut.exception()
328 if self.wait:
329 sleep = self.wait(retry_state)
RetryError: RetryError[<Future at 0x7fc0c69be4f0 state=finished raised AttributeError>
This still works fine for dask.
AttributeError: 'tuple' object has no attribute 'tuple'
This means you're indexing a LazilyIndexedArray with a tuple but you need to wrap that tuple in an Indexer object llke BasicIndexer, VectorizedIndexer etc..
I think it might make sense for me to remove the
CubedManagerclass from this PR and instead put that & cubed+xarray tests into another repo. That keeps xarray's changes minimal, doesn't require putting cubed in any xarray CI envs, and hopefully allows us to merge theChunkManagerchanges here earlier.
That sounds like a good plan to me.
Places
daskis still explicitly imported in xarrayThere are a few remaining places where I haven't generalised to remove specific
import daskcalls either because it won't be imported at runtime unless you ask for it, cubed doesn't implement the equivalent function, that function isn't in the array API standard, or because I'm not sure if the dask concept used generalises to other parallel frameworks.
- [ ]
open_mfdataset(..., parallel=True)- there is nocubed.delayedto wrap theopen_datasetcalls in,- [ ]
Dataset.__dask_graph__and all the other similar dask magic methods- [ ]
dask_array_ops.rolling- uses functions fromdask.array.overlap,- [ ]
dask_array_ops.least_squares- usesdask.array.apply_along_axisanddask.array.linalg.lstsq,- [ ]
dask_array_ops.push- usesdask.array.reductions.cumreduction
This is a useful list! I hope that we could close the gap for some of these over time.
I would like to get to the point where you can use xarray with a chunked array without ever importing dask. I think this PR gets very close, but that would be tricky to test because cubed depends on dask (so I can't just run the test suite without dask in the environment)
Agreed. I have opened https://github.com/tomwhite/cubed/issues/154 to make it possible to test without a Dask dependency.
dask_array_ops.rolling - uses functions from dask.array.overlap
This is unused, we use sliding_window_view now.
Thanks @dcherian ! Once I copied that explicit indexer business I was able to get serialization to and from zarr working with cubed!
In [1]: import xarray as xr
In [2]: from cubed import Spec
In [3]: ds = xr.open_dataset(
...: 'airtemps.zarr',
...: chunks={},
...: from_array_kwargs={
...: 'manager': 'cubed',
...: 'spec': Spec(work_dir="tmp", max_mem=20e6),
...: }
...: )
/home/tom/Documents/Work/Code/xarray/xarray/backends/plugins.py:139: RuntimeWarning: 'netcdf4' fails while guessing
warnings.warn(f"{engine!r} fails while guessing", RuntimeWarning)
/home/tom/Documents/Work/Code/xarray/xarray/backends/plugins.py:139: RuntimeWarning: 'scipy' fails while guessing
warnings.warn(f"{engine!r} fails while guessing", RuntimeWarning)
In [4]: ds['air']
Out[4]:
<xarray.DataArray 'air' (time: 2920, lat: 25, lon: 53)>
cubed.Array<array-004, shape=(2920, 25, 53), dtype=float32, chunks=((730, 730, 730, 730), (13, 12), (27, 26))>
Coordinates:
* lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
* lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
* time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Attributes:
GRIB_id: 11
...
In [5]: ds.isel(time=slice(100, 300)).to_zarr("cubed_subset.zarr")
/home/tom/Documents/Work/Code/xarray/xarray/core/dataset.py:2118: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore
Out[5]: <xarray.backends.zarr.ZarrStore at 0x7f34953033c0>
I would like to get to the point where you can use xarray with a chunked array without ever importing dask. I think this PR gets very close, but that would be tricky to test because cubed depends on dask (so I can't just run the test suite without dask in the environment
I just released Cubed 0.6.0 which doesn't have a dependency on Dask, so this should be possible now.
I just released Cubed 0.6.0 which doesn't have a dependency on Dask, so this should be possible now.
Does this mean my comment https://github.com/pydata/xarray/pull/7019#discussion_r970713341 is valid again?
Does this mean my comment https://github.com/pydata/xarray/pull/7019#discussion_r970713341 is valid again?
Yes I think it does @headtr1ck - thanks for the reminder about that.
I now want to finish this PR by exposing the "chunk manager" interface as a new entrypoint, copying the pattern used for xarray's backends. That would allow me to move the cubed-specific CubedManager code into a separate repository, have the choice of chunkmanager default to whatever is installed, but ask explicitly what to do if multiple chunkmanagers are installed. That should address your comment @headtr1ck.
I would like to get to the point where you can use xarray with a chunked array without ever importing dask. I think this PR gets very close, but that would be tricky to test because cubed depends on dask (so I can't just run the test suite without dask in the environment
I just released Cubed 0.6.0 which doesn't have a dependency on Dask, so this should be possible now.
Actually testing cubed with xarray in an environment without dask is currently blocked by rechunker's explicitly dependency on dask, see https://github.com/pangeo-data/rechunker/issues/139
EDIT: We can hack around this by pip installing cubed, then pip uninstalling dask as mentioned here
Thanks for the review @dcherian! I agree with basically everything you wrote.
The main difficulty I have at this point is non-reproducible failures as described here
I've made a bare-bones cubed-xarray package to store the CubedManager class, as well as any integration tests (yet to be written). @tomwhite you should have an invitation to be an owner of that repo. It uses the entrypoint exposed in this PR to hook in, and seems to work for me locally :grin:
I'm having problems with ensuring the behaviour of the chunks='auto' option is consistent between .chunk and open_dataset. These problems appeared since vendoring dask.array.core.normalize_chunks. Right now the only failing tests use chunks='auto' (e.g xarray/tests/test_backends.py::test_chunking_consintency[auto] - yes there's a typo in that test's name), and they fail because xarray decides on different sizes for the automatically-chosen chunks.
What's weird is that all tests pass for me locally but these failures occur on just some of the CI jobs (and which CI jobs is not even consistent apparently???). I have no idea why this would behave differently on only some of the CI jobs, especially after double-checking that array-chunk-size is being correctly determined from the dask config variable within normalize_chunks.
I'm having problems with ensuring the behaviour of the chunks='auto' option is consistent between
.chunkandopen_dataset
Update on this rabbit hole: This commit to dask changed the behaviour of dask's auto-chunking logic, such that if I run my little test script test_old_get_chunk.py on dask releases before and after that commit I get different chunking patterns:
from xarray.core.variable import IndexVariable
from dask.array.core import normalize_chunks # import the
import itertools
from numbers import Number
import dask
import dask.array as da
import xarray as xr
import numpy as np
# This function is copied from xarray, but calls dask.array.core.normalize_chunks
# It is used in open_dataset, but not in Dataset.chunk
def _get_chunk(var, chunks):
"""
Return map from each dim to chunk sizes, accounting for backend's preferred chunks.
"""
if isinstance(var, IndexVariable):
return {}
dims = var.dims
shape = var.shape
# Determine the explicit requested chunks.
preferred_chunks = var.encoding.get("preferred_chunks", {})
preferred_chunk_shape = tuple(
preferred_chunks.get(dim, size) for dim, size in zip(dims, shape)
)
if isinstance(chunks, Number) or (chunks == "auto"):
chunks = dict.fromkeys(dims, chunks)
chunk_shape = tuple(
chunks.get(dim, None) or preferred_chunk_sizes
for dim, preferred_chunk_sizes in zip(dims, preferred_chunk_shape)
)
chunk_shape = normalize_chunks(
chunk_shape, shape=shape, dtype=var.dtype, previous_chunks=preferred_chunk_shape
)
# Warn where requested chunks break preferred chunks, provided that the variable
# contains data.
if var.size:
for dim, size, chunk_sizes in zip(dims, shape, chunk_shape):
try:
preferred_chunk_sizes = preferred_chunks[dim]
except KeyError:
continue
# Determine the stop indices of the preferred chunks, but omit the last stop
# (equal to the dim size). In particular, assume that when a sequence
# expresses the preferred chunks, the sequence sums to the size.
preferred_stops = (
range(preferred_chunk_sizes, size, preferred_chunk_sizes)
if isinstance(preferred_chunk_sizes, Number)
else itertools.accumulate(preferred_chunk_sizes[:-1])
)
# Gather any stop indices of the specified chunks that are not a stop index
# of a preferred chunk. Again, omit the last stop, assuming that it equals
# the dim size.
breaks = set(itertools.accumulate(chunk_sizes[:-1])).difference(
preferred_stops
)
if breaks:
warnings.warn(
"The specified Dask chunks separate the stored chunks along "
f'dimension "{dim}" starting at index {min(breaks)}. This could '
"degrade performance. Instead, consider rechunking after loading."
)
return dict(zip(dims, chunk_shape))
chunks = 'auto'
encoded_chunks = 100
dask_arr = da.from_array(
np.ones((500, 500), dtype="float64"), chunks=encoded_chunks
)
var = xr.core.variable.Variable(data=dask_arr, dims=['x', 'y'])
with dask.config.set({"array.chunk-size": "1MiB"}):
chunks_suggested = _get_chunk(var, chunks)
print(chunks_suggested)
(cubed) tom@tom-XPS-9315:~/Documents/Work/Code/dask$ git checkout 2022.9.2
Previous HEAD position was 7fe622b44 Add docs on running Dask in a standalone Python script (#9513)
HEAD is now at 3ef47422b bump version to 2022.9.2
(cubed) tom@tom-XPS-9315:~/Documents/Work/Code/dask$ python ../experimentation/bugs/auto_chunking/test_old_get_chunk.py
{'x': (362, 138), 'y': (362, 138)}
(cubed) tom@tom-XPS-9315:~/Documents/Work/Code/dask$ git checkout 2022.9.1
Previous HEAD position was 3ef47422b bump version to 2022.9.2
HEAD is now at b944abf68 bump version to 2022.9.1
(cubed) tom@tom-XPS-9315:~/Documents/Work/Code/dask$ python ../experimentation/bugs/auto_chunking/test_old_get_chunk.py
{'x': (250, 250), 'y': (250, 250)}
(I was absolutely tearing my hair out trying to find this bug, because after the change normalize_chunks became a pure function, but before the change it actually wasn't, so I was trying calling normalize_chunks with the exact same set of input arguments and was still not able to reproduce the bug :angry: )
Anyway what this means is as this PR vendors dask.array.core.normalize_chunks, but the behaviour of dask.array.core.normalize_chunks changed between the version in CI job min-all-deps and the other CI jobs, the single vendored function cannot possibly match both behaviours.
I think one simple way to fix this failure without should be to upgrade the minimum version of dask to >=2022.9.2 (from 2022.1.1 where it currently is).
EDIT: I tried changing the minimum version of dask-core in min-all-deps.yml but the conda solve failed. But also would updating to 2022.9.2 now violate xarray's minimum dependency versions policy?
EDIT2: Another way to fix this should be to un-vendor dask.array.core.normalize_chunks within xarray. We could still achieve the goal of running cubed without dask by making normalize_chunks the responsibility of the chunkmanager instead, as cubed's vendored version of that function is not subject to xarray's minimum dependencies requirement.
We could still achieve the goal of running cubed without dask by making normalize_chunks the responsibility of the chunkmanager
Seems OK to me.
The other option is to xfail the broken tests on old dask versions
I would like to merge this now please! It works, it passes the tests, including mypy.
The main feature not in this PR is using parallel=True with open_mfdataset, which is still coupled to dask.delayed - I made #7811 to track that so I could get this PR merged.
If we merge this I can start properly testing cubed with xarray (in cubed-xarray).
@shoyer @dcherian if one of you could merge this or otherwise tell me anything else you think is still required!
(Okay now the failures are from https://github.com/pydata/xarray/pull/7815 which I've separated out, and from https://github.com/pydata/xarray/pull/7561 being recently merged into main which is definitely not my fault :sweat_smile: https://github.com/pydata/xarray/pull/7019/commits/316c63d55f4e2c317b028842f752a40596f16c6d shows that this PR passes the tests by itself.)
@Illviljan thanks for all your comments!
Would you (or @keewis?) be willing to approve this PR now? I would really like to merge this so that I can release a version of xarray that I can use as a dependency for cubed-xarray.
Thanks @TomNicholas Big change!
Woooo thanks @dcherian !
👏 Congrats @TomNicholas on getting this in! Such an important contribution. 👏
Thanks for all your hard work on this @TomNicholas!