xarray icon indicating copy to clipboard operation
xarray copied to clipboard

Using the shuffle primitive in Xarray

Open dcherian opened this issue 1 year ago • 3 comments

Is your feature request related to a problem?

dask recently added dask.array.shuffle to help with some classic GroupBy.map problems.

shuffle reorders the array so that all members of a single group are in a single chunk, with the possibility of multiple groups in a single chunk. I see a few ways to use this in Xarray:

  1. GroupBy.shuffle() This shuffles and returns a new GroupBy object with which to do further operations (e.g. map).
  2. Dataset.shuffle_by(Grouper) This shuffles, and returns a new dataset (or dataarray), so that the shuffled data can be persisted to disk or you can do other things later (xref #5674)
  3. Use GroupBy.shuffle under the hood in DatasetGroupBy.quantile and DatasetGroupBy.median, so that the exact quantile always works regardless of chunking (right now we raise and error), this seems like a no-brainer.
  4. Add either a shuffle kwarg to GroupBy.map and/or GroupBy.reduce or a new API (e.g. GroupBy.transform or GroupBy.map_shuffled) that will shuffle, then xarray.map_blocks a wrapper function that applies the Groupby on each block. This is how dask dataframe implements Groupby.apply

#9320 implements (1,2). (1) is mostly for convenience, I could easily see us recommending using (2) before calling the GroupBy.

Thoughts?

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

dcherian avatar Sep 25 '24 15:09 dcherian

I think (1,2,3) sound good to me. I'm not sure if I would go with (4), since that would either introduce a new kwarg everywhere, or a new API that would simply be a combination of shuffle and other, existing methods. Instead, I'd probably recommend explicitly chaining shuffle with the existing method.

keewis avatar Sep 25 '24 22:09 keewis

new API that would simply be a combination of shuffle and other, existing methods.

the equivalent would be a little involved:

shuffled = ds.shuffle(grouper)
mapped = xr.map_blocks(lambda x: x.groupby(grouper).map(udf), template=...)

instead of

ds.groupby(grouper).map_shuffle(udf)

Though now I am realizing that we can detect if chunk boundaries line up with group boundaries and use map_blocks automatically so that this works well:

ds.groupby(grouper).shuffle().map(udf, template=...)

However, I am not sure if I like that kind of implicit behaviour. We still have the issue of passing template to the underlying map_blocks call.

dcherian avatar Sep 25 '24 22:09 dcherian

but doesn't that mean that we'd really want to make that two separate operations? One for shuffling the data such that groups are within a single (possibly shared) chunk, and one where we apply a function to those shuffled chunks? Or am I missing anything?

shuffled = ds.groupby(grouper).shuffle()
xr.map_blocks(udf, shuffled, template=...)

(not sure if I got the syntax of map_blocks right)

Edit: otherwise we might also have a map_blocks method on the return value of shuffle (GroupBy?) if that's not enough

keewis avatar Sep 25 '24 22:09 keewis