Custom Indexing API for Pipeline-level Parallelism
Inspired by zarrs, where you can do a sort of jump-around-slicing, I broadly wish to be able to do the following disjoint slice indexing (although I am suggesting, motivated by this request, a more general API).
The current "public" solution would be to either asyncio.gather a bunch of disparate slice requests or to use numpy indexing if you are not using the async API.
I get better performance by batching together all of the slice requests into one indexer use the pipeline-level parallelism as opposed to top-level parallelism.
import zarr
import numpy as np
import asyncio
import platform
import subprocess
def clear_cache():
if platform.system() == "Darwin":
subprocess.call(['sync', '&&', 'sudo', 'purge'])
elif platform.system() == "Linux":
subprocess.call(['sudo', 'sh', '-c', "sync; echo 3 > /proc/sys/vm/drop_caches"])
else:
raise Exception("Unsupported platform")
class MultiBasicIndexer(zarr.core.indexing.Indexer):
"""An indexer that batches together multiple disparate slicing 1D slicing indexers together into a single indexer"""
def __init__(self, indexers: list[zarr.core.indexing.Indexer]):
self.shape = tuple(
sum(i.shape[k] for i in indexers) for k in range(len(indexers[0].shape))
)
self.drop_axes = indexers[0].drop_axes # maybe?
self.indexers = indexers
def __iter__(self):
total = 0
for i in self.indexers:
for c in i:
gap = c[2][0].stop - c[2][0].start
yield type(c)(c[0], c[1], (slice(total, total + gap)), c[3])
total += gap
z = zarr.create_array("foo.zarr", data=np.random.randn(10_000 * 10_000))
chunk_size = z.chunks[0]
chunks = list(range(0, 15, 2)) # disparate chunks i.e., skipping around by a factor of 2
slices = [zarr.core.indexing.BasicIndexer(slice(i * chunk_size, (i+1)*chunk_size), shape=z.metadata.shape, chunk_grid=z.metadata.chunk_grid) for i in range(0, 15, 2)]
np_indexer = np.concatenate([np.arange(i * chunk_size, (i+1)*chunk_size) for i in range(0, 15, 2)])
async def run():
return np.concatenate(await asyncio.gather(*(z._async_array._get_selection(s, prototype=zarr.core.buffer.default_buffer_prototype()) for s in slices))).reshape(chunk_size * len(chunks))
assert (z[np_indexer] == zarr.core.sync.sync(run())).all()
clear_cache()
%timeit z[np_indexer]
clear_cache()
%timeit zarr.core.sync.sync(run())
clear_cache()
%timeit zarr.core.sync.sync(z._async_array._get_selection(MultiBasicIndexer(slices), prototype=zarr.core.buffer.default_buffer_prototype()))
I'm uploading a juv runnable notebook as well with cache clearing built in:
indexing_benchmark.ipynb.zip
So I see about a 40% speedup for the batched indexing vs. asyncio.gather separated indexing (both of which are light years ahead of integer indexing). This is an upper bound - usually, the difference was ~20-30%, but zarrs gives another 20-30% which means we're now close to a 2X speedup.
Thus I see a potential route forward here as making the following public
-
AsyncArray._get_selection(or something similar, just saying this because it exists) -
Indexerabc class +ChunkProjectionfor iteration - An
into_indexerfunction that returnsIndexer-based classes for a simple selection (like tuple of slices or so). I think theIndexerreturn type would probably be enough to do useful things since the iterator for allIndexersubclasses returnsChunkProjection, which can be modified as needed (see my motivating example).
Open to other thoughts!