xarray icon indicating copy to clipboard operation
xarray copied to clipboard

Alternative parallel execution frameworks in xarray

Open TomNicholas opened this issue 1 year ago • 8 comments

Is your feature request related to a problem?

Since early on the project xarray has supported wrapping dask.array objects in a first-class manner. However recent work on flexible array wrapping has made it possible to wrap all sorts of array types (and with #6804 we should support wrapping any array that conforms to the array API standard).

Currently though the only way to parallelize array operations with xarray "automatically" is to use dask. (You could use xarray-beam or other options too but they don't "automatically" generate the computation for you like dask does.)

When dask is the only type of parallel framework exposing an array-like API then there is no need for flexibility, but now we have nascent projects like cubed to consider too. @tomwhite

Describe the solution you'd like

Refactor the internals so that dask is one option among many, and that any newer options can plug in in an extensible way.

In particular cubed deliberately uses the same API as dask.array, exposing:

  1. the methods needed to conform to the array API standard
  2. a .chunk and .compute method, which we could dispatch to
  3. dask-like functions to create computation graphs including blockwise, map_blocks, and rechunk

I would like to see xarray able to wrap any array-like object which offers this set of methods / functions, and call the corresponding version of that method for the correct library (i.e. dask vs cubed) automatically.

That way users could try different parallel execution frameworks simply via a switch like

ds.chunk(**chunk_pattern, manager="dask")

and see which one works best for their particular problem.

Describe alternatives you've considered

If we leave it the way it is now then xarray will not be truly flexible in this respect.

Any library can wrap (or subclass if they are really brave) xarray objects to provide parallelism but that's not the same level of flexibility.

Additional context

cubed repo

PR about making xarray able to wrap objects conforming to the new array API standard

cc @shoyer @rabernat @dcherian @keewis

TomNicholas avatar Jul 18 '22 21:07 TomNicholas

This sounds great! We should finish up https://github.com/pydata/xarray/pull/4972 to make it easier to test.

dcherian avatar Jul 18 '22 21:07 dcherian

Another parallel framework would be Ramba

cc @DrTodd13

dcherian avatar Jul 19 '22 01:07 dcherian

Sounds good to me. The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.

shoyer avatar Jul 19 '22 02:07 shoyer

at SciPy i learned of fugue which tries to provide a unified API for distributed DataFrames on top of Spark and Dask. it could be a great source of inspiration.

andersy005 avatar Jul 19 '22 03:07 andersy005

Thanks for opening this @TomNicholas

The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.

Agreed. I feel like there's already an implicit set of "chunked array" methods that xarray expects from Dask that could be formalised a bit and exposed as an integration point.

tomwhite avatar Jul 19 '22 10:07 tomwhite

Might I propose Arkouda?

https://github.com/Bears-R-Us/arkouda https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf

sdbachman avatar Sep 14 '22 20:09 sdbachman

Might I propose Arkouda?

https://github.com/Bears-R-Us/arkouda https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf

Have they improved recently to support more than 1D arrays?

DrTodd13 avatar Sep 14 '22 20:09 DrTodd13

Not really a generic and parallel execution back-end, but Open-EO looks like an interesting use case too (it is a framework for managing remote execution of processing tasks on multiple big Earth observation cloud back-ends via a common API). I've suggested the idea of reusing the Xarray API here: https://github.com/Open-EO/openeo-python-client/issues/334.

benbovy avatar Oct 13 '22 09:10 benbovy

@rabernat just pointed out to me that in order for this to work well we might also need lazy concatenation of arrays.

Xarray currently has it's own internal wrappers that allow lazy indexing, but they don't yet allow lazy concatenation. Instead dask is what does lazy concatenation under the hood right now.

This is a problem - it means that concatenating two cubed-backed DataArrays will trigger loading both into memory, whereas concatenating two dask-backed DataArrays will not. If #4628 was implemented then xarray would never load the underlying array into memory regardless of the backend.

TomNicholas avatar Oct 20 '22 19:10 TomNicholas

Cubed should define a concatenate function, so that should be OK

shoyer avatar Oct 21 '22 03:10 shoyer

Cubed implements concat, but perhaps xarray needs richer concat functionality than that?

tomwhite avatar Oct 21 '22 09:10 tomwhite

IIUC the issue Ryan & Tom are talking about is tied to reading from files.

For example, we read from a zarr store using zarr, then wrap that zarr.Array (or h5Py Dataset) with a large number of ExplicitlyIndexed Classes that enable more complicated indexing, lazy decoding etc.

IIUC #4628 is about concatenating such arrays i.e. neither zarr.Array nor ExplicitlyIndexed support concatenation, so we end up calling np.array and forcing a disk read.

With dask or cubed we would have dask(ExplicitlyIndexed(zarr)) or cubed(ExplicitlyIndexed(zarr)) so as long as dask and cubed define concat and we dispatch to them, everything is 👍🏾

PS: This is what I was attempting to explain (not very clearly) in the distributed arrays meeting. We don't ever use dask.array.from_zarr (for e.g.). We use zarr to read, then wrap in ExplicitlyIndexed and then pass to dask.array.from_array.

dcherian avatar Oct 21 '22 15:10 dcherian