Idea: Ray Plasma backed zarr store for intermediate results
Hi, cubed community.
I have been thinking about the option to port my zarr+dask-array+xarray+dask-kubernetes workloads to cubed+ray, and currently I feel like the necessity to persist intermediate results to an external storage is a big blocker and performance killer for me, compared to dask's p2p model.
I am no expert in neither ray nor cubed, but I think there might be a simple way to achieve somewhat similar functionality to dask, if we used as the zarr backend for intermediate arrays a (not yet implemented) zarr store backed by the Ray Plasma object store. That way, we could offload many of the difficult things to ray:
- Garbage collect the intermediate zarr stores as soon as they are not needed anymore
- Task locality: consecutive tasks on the same chunks could be scheduled on the same worker (though I understand this is not a big issue for cubed, thanks to its fusing optimization)
- Sharing data among workers without the need for external storage
Idea for implementation:
- For each zarr store, there would be one ray actor
- The actor would keep all zarr metadata
- All the chunks would be stored in the Plasma store
- The actor would keep a mapping between Plasma object id <-> zarr chunk key
What do you think about it?
This is really interesting! It sounds very much related to other conversations we have been having about using zarr stores that are either in-memory on one machine (see https://github.com/cubed-dev/cubed/issues/502), in-memory across multiple machines on HPC (see https://github.com/cubed-dev/cubed/issues/467#issuecomment-2227101101), or in some object store that's at least faster than S3 (see https://github.com/cubed-dev/cubed/issues/237).
@tomwhite can correct me if I'm wrong but I think to make this idea work we would have to make the backends for Cubed pluggable, so that you could use whatever type of zarr store implementation you wanted.
Hi @vladidobro - thanks for opening this issue - interesting idea!
I have been thinking about the option to port my zarr+dask-array+xarray+dask-kubernetes workloads to cubed+ray, and currently I feel like the necessity to persist intermediate results to an external storage is a big blocker and performance killer for me, compared to dask's p2p model.
It would be very helpful if you have an example workload that you could share or describe here as that would help us understand if it's a good fit. Sharing the plan by calling visualize() would be useful (see https://cubed-dev.github.io/cubed/user-guide/diagnostics.html#visualize-the-computation-plan).
I think there might be a simple way to achieve somewhat similar functionality to dask, if we used as the zarr backend for intermediate arrays a (not yet implemented) zarr store backed by the Ray Plasma object store.
I don't know anything about Plasma - does it allow sharing across machines? Also it seems very old at this point: https://github.com/ray-project/plasma/commits/master/
We'd need to do a bit of work to open up Cubed to arbitrary intermediate Zarr stores (#784). I have a draft PR in #785 that moves us in that direction.
Hi,
It would be very helpful if you have an example workload that you could share or describe here as that would help us understand if it's a good fit
Most of it is analytics and feature extraction on 5-50 TB weather forecast datasets (initialization_time x valid_time x lat x lon), the usual pipeline looks e.g. something like
xr.open_zarr('s3://mybucket/ifs.zarr') \
.sel(lat=latitudes, lon=longitudes, method='nearest') \
... # computed variables based on base variables (mostly elementwise ufuncs)
.group_by('country').mean(['point']) \
.to_dask_dataframe() \
.to_parquet('s3://mybucket/ifs_features') \
# or .to_zarr
or just administrative zarr tasks (backfilling, changing compression, rechunking, etc.), so nothing much complicated, the task graph is quite typical map reduce (except the ocassional rechunk, but then rechunk is the only operation).
The main reasons why cubed+ray sounds promising to me is the memory guarantees of cubed (fine-tuning memory requirements is really a huge pain) and some of ray's features compared to dask (ray environments, better integration with ML ecosystem).
Also it seems very old at this point
Yes, sorry, it seems like Plasma itself is deprecated and Ray uses it's own fork of Plasma. The best docs for it seems to be this page: https://docs.ray.io/en/latest/ray-core/objects/serialization.html
Anyway, to give a quick summary of it's features as I understand them
- Each worker node hosts many worker processes
- Each worker node has one shared object store which is shared between all the worker processes on the node
- When a worker process requests an object which resides on a different node, the data are transparently transferred to its node's object store
- It supports disk spilling
- The api is
ray.put(o: object) -> ObjectRefwhere object ref is some unique IDray.get(ref: ObjectRef) -> object
- The
ObjectRefs are reference counted, and the objects are garbage collected when no ObjectRefs to it exist.
Thanks for the details on the workloads you are running. Since you are using Xarray they should be easy to try on Cubed (see https://cubed-dev.github.io/cubed/examples/xarray.html). You could start by running locally on a small dataset to check that the API coverage is there - if there are any gaps we'd be very happy to hear about them and fix them (https://github.com/cubed-dev/cubed-xarray/issues).
Re Plasma - are you interested in building something to test this out?
Re Plasma - are you interested in building something to test this out?
Eventually yes, I am. The shared memory zarr store seems like quite simple thing to implement, the difficult thing seems to me to allow cubed to take full advantage of it. That would require not treating it as an opaque zarr store, but instead exposing the chunk-level task dependencies to the executor. And that almost sounds like the the proposal to use arbitrary blob stores https://github.com/cubed-dev/cubed/issues/727 which, if done, would allow using the ray object store directly without any zarr abstraction on top.
I am now waiting for the infra team to setup Ray for me, and when I get to test it, I will try to implement this zarr store and get back to you.
@vladidobro #801 should allow you to use a Ray store directly.