zarr-python
zarr-python copied to clipboard
[v3] Generalized NDArray support
Recent discussions at the Refactor meeting have indicated indicated that adding generalized array support to the v3 roadmap would be useful.
@akshaysubr - can I ask you to take this ticket and push it forward?
Would propose renaming this issue to Generalized NDArray support
to disambiguate between zarr arrays and the underlying ndarray datastructures used.
Proposal for adding generalized ndarray support to zarr
-
Create a native zarr NDArray class for typing and to interface with existing protocols. This includes:
- Buffer protocol (bytes arrays, etc.)
-
__array_interface__
(Numpy) -
__cuda_array_interface__
(PyTorch, CuPy, etc.) - DLPack (TensorFlow, JAX, etc.)
- Raw pointers
-
What this generalized ndarray class might look like:
namespace zarr { namespace py = pybind11; using namespace py::literals; class Array { public: Array(zarrArrayInfo_t* array_info, int device_id); Array(py::object o, intptr_t cuda_stream = 0); py::dict array_interface() const; py::dict cuda_interface() const; py::tuple shape() const; py::tuple strides() const; // Strides of axes in bytes py::object dtype() const; zarrArrayBufferKind_t getBufferKind() const; // Device or Host buffer py::capsule dlpack(py::object stream) const; // Export to DLPack py::object cpu(); // Move array to CPU py::object cuda(bool synchronize, int device_id) const; // Move array to GPU const zarrArrayInfo_t& getArrayInfo() const { return array_info_; }; static void exportToPython(py::module& m); }; } // namespace zarr
-
Interoparability with Numpy
ascending = np.arange(0, 4096, dtype=np.int32) zarray_h = zarr.ndarray.as_array(ascending) print(ascending.__array_interface__) print(zarray_h.__array_interface__) print(zarray_h.__cuda_array_interface__) print(zarray_h.buffer_size) print(zarray_h.buffer_kind) print(zarray_h.ndim) print(zarray_h.dtype)
-
Interoparability with Cupy
data_gpu = cp.array(ascending) zarray_d = zarr.ndarray.as_array(data_gpu) print(data_gpu.__cuda_array_interface__) print(zarray_d.__cuda_array_interface__) print(zarray_d.buffer_kind) print(zarray_d.ndim) print(zarray_d.dtype)
-
Convert CPU to GPU
zarray_d_cnv = zarray_h.cuda() print(zarray_d_cnv.__cuda_array_interface__)
-
Convert GPU to CPU
zarray_h_cnv = zarray_d.cpu() print(zarray_h_cnv.__array_interface__)
-
Anything that supports the buffer protocol
with open('file.txt', "rb") as f: text = f.read() zarray_txt_h = zarr.ndarray.as_array(text) print (zarray_txt_h.__array_interface__) zarray_txt_d = zarray_txt_h.cuda() print(zarray_txt_d.__cuda_array_interface__)
How to adapt zarr v3 implementations using this interface
Changes to codec pipelines
No change to how the BatchedCodecPipeline
(or any other CodecPipeline
works):
for bb_codec, chunk_spec_batch in bb_codecs_with_spec[::-1]:
chunk_bytes_batch = await bb_codec.decode_batch(
zip(chunk_bytes_batch, chunk_spec_batch), runtime_configuration
)
ab_codec, chunk_spec_batch = ab_codec_with_spec
chunk_array_batch = await ab_codec.decode_batch(
zip(chunk_bytes_batch, chunk_spec_batch), runtime_configuration
)
for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
chunk_array_batch = await aa_codec.decode_batch(
zip(chunk_array_batch, chunk_spec_batch), runtime_configuration
)
Changes to codecs
The codec definitions would have to change a bit though. These would all accept this new zarr.ndarray
object and return objects of the same type. So just byte streams would also be just a 1D zarr.ndarray
with dtype U1
.
class ArrayBytesCodec(Codec):
@abstractmethod
async def decode(
self,
chunk_bytes: zarr.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> zarr.ndarray:
pass
@abstractmethod
async def encode(
self,
chunk_array: zarr.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[zarr.ndarray]:
pass
class ArrayBytesCodec(Codec):
@abstractmethod
async def decode(
self,
chunk_bytes: zarr.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> zarr.ndarray:
pass
@abstractmethod
async def encode(
self,
chunk_array: zarr.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[zarr.ndarray]:
pass
Changes to stores
Stores would also need to accept/return zarr.ndarray
objects instead of just bytes objects.
class Store(ABC):
@abstractmethod
async def get(
self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> Optional[zarr.ndarray]:
"""Retrieve the value associated with a given key.
Parameters
----------
key : str
byte_range : tuple[int, Optional[int]], optional
Returns
-------
zarr.ndarray
"""
...
@abstractmethod
async def set(self, key: str, value: zarr.ndarray) -> None:
"""Store a (key, value) pair.
Parameters
----------
key : str
value : zarr.ndarray
"""
...
User facing APIs using this interface
Option 1: Return/accept zarr.ndarray
objects, force users to convert to desired type using one of the above interoperability methods
in = np.arange(100)
in_z = zarr.ndarray.as_array(in)
z = zarr.array(in_z)
out = z[:5] # zarr.ndarray
out_np = np.asarray(out)
Option 2: Add ameta_array
parameter to the ArrayMetaData
This way, the meta array used when creating an array gets encoded (assuming we define a JSON encoder for it) and stored in the ZARR_JSON
file and using AsyncArray.open(...)
will repopulate the metadata with the same meta_array
used at creation time. However, this might not be ideal since the compute environment for creating zarr arrays might be different from that when reading (CPU jobs for data pre-processing, GPU jobs for training on that data, for example). So maybe the ability to override that meta_array
at read time would be useful:
@classmethod
async def open(
cls,
store: StoreLike,
meta_array: Optional[ArrayLike] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> AsyncArray:
store_path = make_store_path(store)
zarr_json_bytes = await (store_path / ZARR_JSON).get()
assert zarr_json_bytes is not None
zarr_json = json.loads(zarr_json_bytes)
if meta_array:
zarr_json['meta_array'] = meta_array
return cls.from_dict(
store_path,
zarr_json,
runtime_configuration=runtime_configuration,
)
This will then allow handling the conversion between zarr.ndarray
and meta_array
(say np.ndarray
) internally and transparent to the user.
Thanks @akshaysubr!
Your proposal looks good. If there is a way that we could achieve the same results in just-Python that would be preferred. I don't know enough whether that is possible with the currently available library APIs.
With the v3 refactoring, we aim to keep the end-user API as close to v2 as possible. Option 1 would be a significant change in that users would need to write np.asarray(z[:5])
instead of just z[:5]
.
That is why option 2 seems more appealing. However, it would not be spec-compliant to persist meta_array
in the json. Therefore, it should be user-provided by create
and open
. It could even be part of the runtime_configuration
because it doesn't affect the storage of the array data.
Thanks for the feedback @normanrz. I think we should be able to do this in pure python but would need to add cupy as a dependency to zarr. That can maybe be made optional for CPU only installs and raise an appropriate exception if trying to access GPU functionality?
Thanks for the feedback @normanrz. I think we should be able to do this in pure python but would need to add cupy as a dependency to zarr. That can maybe be made optional for CPU only installs and raise an appropriate exception if trying to access GPU functionality?
Thaks sounds great! Adding cupy as an optional dependency would be a good solution.
I propose we take it a step further and implement an output-by-caller policy. That is, all components in Zarr-python take an output buffer/array argument and write their results in said buffer/array instead of returning a new buffer/array.
Con
- Every component needs to have a function that returns the output-size (exact or an over-bound) given a specific input.
Pros
- A caller can read multiple chunks into a contiguous memory buffer without any copying.
- A caller can read into existing buffers such as communication buffers, shared memory buffers, CUDA managed memory, etc.
- The caller decides how the memory is allocated, e.g. thought a memory pool.
- The caller devices the memory type such as CUDA device memory and host memory.
- Components can optimize based on the input and output type.
I suggest that we introduce a NDBufferView
that can be used both as the chunk_array
argument (like @akshaysubr suggest above) and as the output argument.
Changes to codecs and stores
class ArrayBytesCodec(Codec):
@abstractmethod
async def decode(
self,
chunk_input: NDBufferView,
chunk_output: NDBufferView,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> None:
pass
@abstractmethod
async def encode(
self,
chunk_input: NDBufferView,
chunk_output: NDBufferView,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> None:
pass
class Store(ABC):
@abstractmethod
async def get(
self,
key: str,
output: NDBufferView,
byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> None:
...
@abstractmethod
async def set(self, key: str, value: NDBufferView) -> None:
...
User facing APIs using this interface
The user can set the output buffer explicit when calling a .setitem()
method on the zarr-array or use the array's default allocator to provide the output buffer implicitly.
@madsbk The output-by-caller policy is definitely very useful especially from a copy optimization and a memory allocator standpoint. The one con that you called out is where I am stuck right now. This challenge was also discussed in this numcodecs issue: https://github.com/zarr-developers/numcodecs/issues/316
For each codec, we can implement what you're suggesting for the write/encode path by having each compressor provide an upper bound for the compressed size given the uncompressed size. Almost all compressors do this already and we'd just have to expose that in the Codec API. But for the read/decode path, this is much harder to do since we don't keep track of the compression ratio or compressed size at each codec stage in the codec pipeline. We can potentially solve that by adding this additional metadata through a zarr spec extension, but in the existing version of the spec, that doesn't seem feasible.
This decode path is a general issue for compressors and finding a good longer term solution for that would be very useful. For example, the numcodecs LZ4 codec currently adds the compressed size as a 4 byte header, but this makes the LZ4 codec streams incompatible with other LZ4 decompressor implementations. The Gzip codec speculatively allocates some memory and pauses decompression to allocate a larger buffer and copy over data before continuing decompression. Most of the GPU codec implementations decompress once without writing anything out just to find the output size so the caller can allocate that much memory and then re-run decompression with output.
I think we can close this issue now that https://github.com/zarr-developers/zarr-python/pull/1910 has been merged?