Serverless parallelization of reference generation
Finding byte ranges in every file in an archival dataset is an embarrassingly-parallel problem, which might be a good fit for serverless.
This step is analogous to the parallel=True option to xr.open_mfdataset, which wraps xr.open_dataset in @dask.delayed for each file to parallelize the opening step (totally separate from any dask.array tree reduction after the arrays have been created).
https://github.com/pydata/xarray/blob/12123be8608f3a90c6a6d8a1cdc4845126492364/xarray/backends/api.py#L1046
With that motivation, it's been suggested that we add a delayed-like function to Cubed https://github.com/cubed-dev/cubed/issues/311, which could in theory plug in to xarray.open_mfdataset.
A simpler way to test this idea would be just to skip the cubed layer and use a lithops map-gather (or whatever the correct primitive is) to try out serverless generation of references. I think for this to work the resulting virtual datasets need to be small enough to be able to be gathered onto one node (thus avoiding a tree-reduce), but the discussion in https://github.com/TomNicholas/VirtualiZarr/issues/104 indicates that this should be okay (at least after #107 is complete).
xref https://github.com/TomNicholas/VirtualiZarr/issues/95
cc @tomwhite @rabernat
I'm reading about how reduce jobs work in lithops but I don't fully get it. Is it the case that we need each serverless worker to write out the references it read to some globally-accessible storage? So that they can all be accessed by a single node/worker for the concatenation step?
Yes though you can also tree-reduce. IIUC serverless tasks don't communicate between each other, and must communicate through storage.
Though perhaps lithops has something like https://modal.com/docs/guide/dicts-and-queues to enable cross-task communication.
I'm reading about how reduce jobs work in lithops but I don't fully get it. Is it the case that we need each serverless worker to write out the references it read to some globally-accessible storage? So that they can all be accessed by a single node/worker for the concatenation step?
I haven't used Lithops map_reduce (we just use map in Cubed), but the reduce function runs in a single container and will receive all the map outputs so they can be combined in the way you specify. (You don't need to write anything to storage, Lithops will do that part for you.) Given that the map output is a small amount of metadata Lithops map_reduce sounds like a good fit for the problem.
It may be possible to actually use cubed to do the serverless concatenation. Cubed implements serverless concatenation operations for numpy arrays, which work like a tree reduction, concatenating subsets and saving each round to an intermediate Zarr store.
The difficulty here is that ManifestArrays are a weird type of array, that is backed underneath by 3 numpy arrays. Cubed would know how to concatenate any one of these arrays but not what to do with all three at once. In particular if I understand correctly the problem is that it wouldn't know how to serialize the intermediate ManifestArrays to Zarr.
One solution might be to repurpose internal machinery cubed has which uses Zarr structured arrays to effectively operate on multiple numpy arrays at once. Cubed has this because reductions such as mean require keeping track not just of the totals but also the counts.
@tomwhite suggested that I could possibly plug a ManifestArray into Cubed in the same way that Tensorstore can be plugged in as an alternative to zarr-python, following this TensorStoreGroup object
https://github.com/cubed-dev/cubed/blob/main/cubed%2Fstorage%2Fbackends%2Ftensorstore.py#L48
But I have to admit I don't fully understand this suggestion yet, I need to play around with it.