xarray
xarray copied to clipboard
Alternative parallel execution frameworks in xarray
Is your feature request related to a problem?
Since early on the project xarray has supported wrapping dask.array
objects in a first-class manner. However recent work on flexible array wrapping has made it possible to wrap all sorts of array types (and with #6804 we should support wrapping any array that conforms to the array API standard).
Currently though the only way to parallelize array operations with xarray "automatically" is to use dask. (You could use xarray-beam or other options too but they don't "automatically" generate the computation for you like dask does.)
When dask is the only type of parallel framework exposing an array-like API then there is no need for flexibility, but now we have nascent projects like cubed to consider too. @tomwhite
Describe the solution you'd like
Refactor the internals so that dask is one option among many, and that any newer options can plug in in an extensible way.
In particular cubed deliberately uses the same API as dask.array
, exposing:
- the methods needed to conform to the array API standard
- a
.chunk
and.compute
method, which we could dispatch to - dask-like functions to create computation graphs including
blockwise
,map_blocks
, andrechunk
I would like to see xarray able to wrap any array-like object which offers this set of methods / functions, and call the corresponding version of that method for the correct library (i.e. dask vs cubed) automatically.
That way users could try different parallel execution frameworks simply via a switch like
ds.chunk(**chunk_pattern, manager="dask")
and see which one works best for their particular problem.
Describe alternatives you've considered
If we leave it the way it is now then xarray will not be truly flexible in this respect.
Any library can wrap (or subclass if they are really brave) xarray objects to provide parallelism but that's not the same level of flexibility.
Additional context
PR about making xarray able to wrap objects conforming to the new array API standard
cc @shoyer @rabernat @dcherian @keewis
This sounds great! We should finish up https://github.com/pydata/xarray/pull/4972 to make it easier to test.
Sounds good to me. The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.
at SciPy i learned of fugue which tries to provide a unified API for distributed DataFrames on top of Spark and Dask. it could be a great source of inspiration.
Thanks for opening this @TomNicholas
The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.
Agreed. I feel like there's already an implicit set of "chunked array" methods that xarray expects from Dask that could be formalised a bit and exposed as an integration point.
Might I propose Arkouda?
https://github.com/Bears-R-Us/arkouda https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf
Might I propose Arkouda?
https://github.com/Bears-R-Us/arkouda https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf
Have they improved recently to support more than 1D arrays?
Not really a generic and parallel execution back-end, but Open-EO looks like an interesting use case too (it is a framework for managing remote execution of processing tasks on multiple big Earth observation cloud back-ends via a common API). I've suggested the idea of reusing the Xarray API here: https://github.com/Open-EO/openeo-python-client/issues/334.
@rabernat just pointed out to me that in order for this to work well we might also need lazy concatenation of arrays.
Xarray currently has it's own internal wrappers that allow lazy indexing, but they don't yet allow lazy concatenation. Instead dask is what does lazy concatenation under the hood right now.
This is a problem - it means that concatenating two cubed-backed DataArrays will trigger loading both into memory, whereas concatenating two dask-backed DataArrays will not. If #4628 was implemented then xarray would never load the underlying array into memory regardless of the backend.
Cubed should define a concatenate function, so that should be OK
Cubed implements concat
, but perhaps xarray needs richer concat functionality than that?
IIUC the issue Ryan & Tom are talking about is tied to reading from files.
For example, we read from a zarr store using zarr
, then wrap that zarr.Array
(or h5Py Dataset) with a large number of ExplicitlyIndexed
Classes that enable more complicated indexing, lazy decoding etc.
IIUC #4628 is about concatenating such arrays i.e. neither zarr.Array
nor ExplicitlyIndexed
support concatenation, so we end up calling np.array
and forcing a disk read.
With dask or cubed we would have dask(ExplicitlyIndexed(zarr))
or cubed(ExplicitlyIndexed(zarr))
so as long as dask
and cubed
define concat
and we dispatch to them, everything is 👍🏾
PS: This is what I was attempting to explain (not very clearly) in the distributed arrays meeting. We don't ever use dask.array.from_zarr
(for e.g.). We use zarr
to read, then wrap in ExplicitlyIndexed
and then pass to dask.array.from_array
.