Codec pipeline memory usage
We discussed memory usage on Friday's community call. https://github.com/TomAugspurger/zarr-python-memory-benchmark started to look at some stuff.
https://rawcdn.githack.com/TomAugspurger/zarr-python-memory-benchmark/refs/heads/main/reports/memray-flamegraph-read-uncompressed.html has the memray flamegraph for reading an uncompressed array (400 MB total, split into 10 chunks of 40 MB each). I think the optimal memory usage here is about 400 MB. Our peak memory is about 2x that.
https://rawcdn.githack.com/TomAugspurger/zarr-python-memory-benchmark/refs/heads/main/reports/memray-flamegraph-read-compressed.html has the zstd compressed version. Peak memory is about 1.1 GiB.
I haven't looked too closely at the code, but I wonder if we could be smarter about a few things in certain cases:
- For the uncompressed case, we might be able to do a
readintodirectly into (an appropriate slice of)theoutarray. We might need to expand the Store API to add some kind ofreadinto, where the user provides the buffer to read into rather than the store allocating new memory. - For the compressed case, we might be able to improve things once we know the size of the output buffers. I see that numcodec's
zstd.decodetakes an output buffer here that we could maybe use. And past that point, maybe all the codecs could reuse one or two buffers, rather than allocating a new buffer for each stage of the codec (one buffer if doing stuff inplace, two buffers if something can't be done inplace)?
I'm not too familiar with the codec pipeline stuff, but will look into this as I have time. Others should feel free to take this if someone gets an itch though. There's some work to be done :)
https://github.com/TomAugspurger/zarr-python-memory-benchmark/blob/4039ba687452d65eef081bce1d4714165546422a/sol.py#L41 has a POC for using readinto to read an uncompressed zarr dataset into a pre-allocated buffer. https://rawcdn.githack.com/TomAugspurger/zarr-python-memory-benchmark/3567246b852d7adacbc10f32a58b0b3f6ac3d50b/reports/memray-flamegraph-sol-read-uncompressed.html shows that that takes ~exactly the size of the output ndarray (so no overhead from Zarr).
https://github.com/TomAugspurger/zarr-python-memory-benchmark/blob/4039ba687452d65eef081bce1d4714165546422a/sol.py#L63 shows an example reading a Zstd compressed dataset. https://rawcdn.githack.com/TomAugspurger/zarr-python-memory-benchmark/3567246b852d7adacbc10f32a58b0b3f6ac3d50b/reports/memray-flamegraph-sol-read-compressed.html shows that the peak memory usage is ~ the size of the compressed dataset + the output ndarray (this does all the decompression first; we could do those sequentially to lower the peak memory usage).
There are some complications around slices that don't align with zarr chunk boundaries that this ignores, but is maybe enough to prove that we could do better.
Thanks for doing this work @TomAugspurger! Coincidentally, I've been looking at memory overheads for Zarr storage operations across different filesystems (local/cloud), compression settings, and Zarr versions: https://github.com/tomwhite/memray-array
There are some complications around slices that don't align with zarr chunk boundaries that this ignores, but is maybe enough to prove that we could do better.
Just reducing the number of buffer copies for aligned slices would be a big win for everyone who uses Zarr, since it would improve performance and reduce memory pressure. Hopefully similar techniques could be used for cloud storage too.
Very cool!
[from https://github.com/tomwhite/memray-array] Reads with no compression incur a single copy from local files, but two copies from S3. This seems to be because the S3 libraries read lots of small blocks then join them into a larger one, whereas local files can be read in one go into a single buffer.
I was wondering about this while looking into the performance of obstore and KvikIO. KvikIO lets the caller provide the out buffer that the data are read into, which lets you avoid the smaller buffer allocations and the set of memcopies into the final output buffer. Probably worth looking into at some point.
I wonder if any of the memory management machinery that has been developed for Apache Arrow would be of use here?
I looked into implementing this today and it'll be a decent amount of effort. There are some issues in the interface provided by the codec pipeline ABC (read takes an out buffer, but decode doesn't) and I got pretty lost in the codec_pipeline implementation (so many iterables of tuples!). I'm not sure where the best place to start is.
Beyond the codec pipeline, I think we'll also need to update the Store and Codec interfaces to add APIs for reading / decoding into an out buffer. This probably has to be opt in (we can't have codecs / stores silently not using an out buffer).
and I got pretty lost in the codec_pipeline implementation (so many iterables of tuples!)
Not the first person! I did made it out alive, but only barely.
On the weekly call today, Davis asked about whether we could do zero-copy read / decompression for variable-width / length types.
For fixed-size types, we can derive that as chunk.dtype.itemsize * chunk.size. This doesn't work for variable-width types because the itemsize is, by definition, variable.
For zero-copy decompression we just need the size of the final buffer. Libraries like pyarrow always(?) know this for their variable sized buffers. I think this would be possible to support, but we'd need to ensure that the metadata includes the chunk size. This would be an example of a chunk-level statistic (https://github.com/zarr-developers/zarr-specs/issues/319).
2. For the compressed case, we might be able to improve things once we know the size of the output buffers. I see that numcodec's
zstd.decodetakes an output buffer here that we could maybe use.
Zarr Python v2 actually does this already: https://github.com/zarr-developers/zarr-python/blob/support/v2/zarr/core.py#L2044-L2050
Zarr Python v2 actually does this already: https://github.com/zarr-developers/zarr-python/blob/support/v2/zarr/core.py#L2044-L2050
FYI here's my hacky attempt to do something similar in v3: https://github.com/tomwhite/zarr-python/commit/9b5e7fc7d1a7e283917f04f04ebe3f60a46055c2
Nice work. https://github.com/TomAugspurger/zarr-python/blob/tom/zero-copy-codec-pipeline/tests/test_memory_usage.py has the start of a test that uses tracemalloc to ensure no unexpected NumPy array allocations are made. This should enable us to verify we aren't unexpectedly allocating arrays, assuming the decompressor isn't allocating memory using numpy internally.
https://github.com/TomAugspurger/zarr-python/blob/tom/zero-copy-alt/simple.py has an implementation I've been hacking up. Compared to the current codec pipeline it's faster and uses less memory, but isn't near feature complete yet.

I'm still analyzing things, but I think the speedups are primarily from
- Avoiding unnecessary memory allocations
- When the chunks to read form contiguous regions of the user-provided slice, we can use numcodecs' ability to decode into an output buffer, rather than a temporary buffer.
- Avoiding the surprisingly expensive
nd_buffer.createhere- Instead, we always use
np.empty - When the key isn't present in the store, we overwrite with the fill value (which is ends up being equivalent to
np.full(shape, fill_value)for that chunk) - When the key is present we decode directly into the output buffer (for contiguous indexers) or a temp buffer followed by a memcpy for non-contiguous indexers
- this could maybe be done on
main?
- Instead, we always use
- Move decodes to a
ThreadPoolExecutor, rather than async- decodes are inherently a compute (or memory) bound operation
-
mainalso gets them to a thread pool, but only viaasyncio.to_thread, which has some overhead (need to measure this; but it's pretty small)
- Overlap reads and decodes
- My understanding is that
mainfirst reads all the bytes and then decodes all the bytes - A straggling reader (not so common locally, but very common from object storage!) means that we don't start decoding any chunks until we're done reading all the chunks
- My implementation uses
asyncio.create_taskandas_completedto run the read tasks concurrently and submit them to the decode pool as the reads complete.
- My understanding is that
One question about the codec pipeline: what's the benefit of batching?
Typically, batching is done to increase throughput: at the cost of some latency, you can sometimes process more things per unit time by operating on batches rather than single items, thanks to algorithmic improvements or doing less work.
But my understanding is that for many (all?) of our codecs, we don't actually operate on multiple arrays while encoding or decoding. IIUC, the fundamental operation of, e.g., our zstd decode is _decode_single which is called via decode. We end up running that in parallel using a thread pool via _batching_helper and our concurrent_map.
So: is batching done for performance? Is it just the programming model we're using for parallelization, but some queues + threads might work equally well (or possible better)?
Are there any design docs around the codec pipeline? Or could someone more familiar with its implementation share some background on the design? cc @normanrz (based on https://github.com/zarr-developers/zarr-python/pull/1670)
The batching use case came from GPU codecs that would en/decode multiple chunks in parallel. I think, so far, none of them have been implemented.
Thanks. https://github.com/zarr-developers/zarr-python/pull/2863 is adding the wrappers for a GPU-based Zstd codec. That does indeed benefit from the batched codec approach, since the underlying library (nvcomp) accepts a batch of buffers to decompress. I'm not 100% sure on the details, but I think that does benefit from the batching at a hardware level since you want to run the same kernel on a bunch of data at the same time, each on a different GPU thread.
I've just realized that the default batch size is 1 (ref: https://zarr.readthedocs.io/en/stable/user-guide/config.html), so my earlier worry about straggling reader blocking any chunks from starting decoding was possibly unfounded.