zarr-python icon indicating copy to clipboard operation
zarr-python copied to clipboard

Enhancement: Chunk level access api / indexing by chunk rather than voxel

Open David-Baddeley opened this issue 4 years ago • 21 comments

I may be missing something, but zarr currently seems to offers a high level interface to the chunked data that appears to be largely transparent to the fact that the underlying data is in fact chunked (much like HDF5). I've got a few use cases where I'd like to be able to directly operate on the underlying chunks rather than the whole large dataset.

My target application is multi-dimensional image processing in large 3D biomedical datasets which there are many situations where it would make sense to perform operations on individual chunks in parallel.

Whilst it would be possible to a) read the chunk size of an array and b) work out how to slice the dataset in multiples of that chunk size, a direct chunk level access might be easier and more efficient. In essence I'm suggesting something which exposes a simplified version of Array._chunk_getitem and Array._chunk_setitem which would only ever get or set a complete chunk.

If you extended this concept somewhat by having the functions return a lightweightChunk object which was essentially a reference to the location of the array, a chunk ID, and a .data property which gave you the data for that chunk and additionally exposed an iterator in the Array class , you could conceivably write code like:

def do_something(chunk):
    res = some_processing_function(chunk.data)
    with zarr.open('output_file_uri') as z1:
        z1.save_chunk(res, chunk.chunk_id)

with multiprocessing.Pool() as pool:
    pool.map(do_something, array.chunk_iterator)

In the longer term (and I'm not sure how to go about this - I might be better aiming for API compatibility with zarr rather than inclusion in zarr) I'd want to enable data-local processing of individual chunks of an array which was chunked across a distributed file system. I've currently got a python solution for this for 2.5 dimensional data (xyt) but it's pretty specific to one use case and I would like to avoid duplicating other efforts as we make it more general.

David-Baddeley avatar Nov 19 '19 21:11 David-Baddeley

I could certainly imagine using such an API in order to get optimal loading.

joshmoore avatar Nov 20 '19 11:11 joshmoore

in situations where it would make sense to perform operations on individual chunks in parallel.

In python, we currently use dask for these use cases. (Zarr was created with Dask interoperability in mind.) Dask can discover the chunk structure of zarr arrays (https://docs.dask.org/en/latest/array-api.html#dask.array.from_zarr) and map them to the chunks of a dask array.

rabernat avatar Nov 20 '19 11:11 rabernat

for reference: https://github.com/dask/dask/blob/fb4e91e015caa570ae6b578a12dc494f85b2e7f7/dask/array/core.py#L2774

So you're suggesting, @rabernat, anyone that wants this functionality would use dask?

Reversely, there's no general "protocol" that could expose such an interface both for dask's benefit as well as a more mundane end-user?

joshmoore avatar Nov 21 '19 10:11 joshmoore

I think what @rabernat is probably getting at is that if you want to do a chunk-wise computation over a zarr array, and you want to parallelise some part of that computation, it's very easy to accomplish with dask. To give a totally trivial example, you have have some zarr array z, then you can do a parallel chunk-wise computation of the sum by doing:

import dask.array as da
da.from_array(z).sum().compute()

Dask will figure out where the chunk boundaries are and split up the computation accordingly. And you can switch that out from running using multiple cores on your local computer to running on multiple compute nodes in a HPC or kubernetes cluster with just a few lines of configuration code.

But not everyone will want to use dask of course, and so I can see the value in having a convenient public API on the zarr side to retrieve or replace a whole chunk.

alimanfoo avatar Nov 21 '19 10:11 alimanfoo

Looking at the code referenced by @joshmoore it appears as though dask simply slices the underlying zarr array. This is likely to entail some overhead (hard to know how much without going through the code in detail, but I suspect there is going to be at least an array allocation and a memcopy, if not substantially more), and will force the data to flow through the controlling node. Something which exposed chunks on a lower level could result in more efficiency (also for dask).

I should probably give a bit more background to my question. Over the last few years we have developed quite a bit of python infrastructure for distributed analysis of microscopy data (documentation is currently very poor, but there is a small amount of high level stuff in the supplement of https://www.biorxiv.org/content/10.1101/606954v2). What we have includes distributed, chunked, storage, fast compression, and a scheduler which facilitates data-local analysis of these chunks. When compared with, e.g. hadoop, dask, spark, etc we're probably faster for our specific task, but much more application specific (we can sustain a task rate of > 1000 tasks/s and a data-rate before compression of ~800MB/s). This has largely grown up in parallel to things like dask and zarr. At the moment we do really well on 2D +time (our chunks are generally a single camera frame), and also have some pretty simple (but reasonably fast - we're writing ~500 tiles/s) 2D tile pyramid code for slide-scanning. We're now looking to move into large 3D volumetric datasets, but don't really want to reinvent the wheel. It could be very attractive to use and potentially contribute to zarr with a new storage backend as we move to 3D, or at the very least maintain api-compatibility. Pretty critical to us in the long term, however, is being able to access and analyse the data where it sits across distributed storage, rather than having to pipe it through a central location. In the medium term this would likely also require adding something like a .locate() functionality to the zar data store which let you identify which node it was on, although this could easily live as a helper function in our scheduler - e.g. locate(data_store, key).

David-Baddeley avatar Nov 21 '19 22:11 David-Baddeley

Hi David, thanks a lot for elaborating, sounds very interesting and very happy to look at API enhancements that would make it easier to use zarr from your framework.

Not trying to sell dask at all :-) but thought I should mention that dask does not require data to move through a controlling node. Typical deployment scenarios with dask and zarr involve tasks running across a cluster of worker nodes, each of which reads data directly from object storage (S3/GCS/ABS/...) or a parallel file system (gpfs/lustre/...).

On Thu, 21 Nov 2019 at 22:38, David Baddeley [email protected] wrote:

Looking at the code referenced by @joshmoore https://github.com/joshmoore it appears as though dask simply slices the underlying zarr array. This is likely to entail some overhead (hard to know how much without going through the code in detail, but I suspect there is going to be at least an array allocation and a memcopy, if not substantially more), and will force the data to flow through the controlling node. Something which exposed chunks on a lower level could result in more efficiency (also for dask).

I should probably give a bit more background to my question. Over the last few years we have developed quite a bit of python infrastructure for distributed analysis of microscopy data (documentation is currently very poor, but there is a small amount of high level stuff in the supplement of https://www.biorxiv.org/content/10.1101/606954v2). What we have includes distributed, chunked, storage, fast compression, and a scheduler which facilitates data-local analysis of these chunks. When compared with, e.g. hadoop, dask, spark, etc we're probably faster for our specific task, but much more application specific (we can sustain a task rate of > 1000 tasks/s and a data-rate before compression of ~800MB/s). This has largely grown up in parallel to things like dask and zarr. At the moment we do really well on 2D +time (our chunks are generally a single camera frame), and also have some pretty simple (but reasonably fast - we're writing ~500 tiles/s) 2D tile pyramid code for slide-scanning. We're now looking to move into large 3D volumetric datasets, but don't really want to reinvent the wheel. It could be very attractive to use and potentially contribute to zarr with a new storage backend as we move to 3D, or at the very least maintain api-compatibility. Pretty critical to us in the long term, however, is being able to access and analyse the data where it sits across distributed storage, rather than having to pipe it through a central location. In the medium term this would likely also require adding something like a .locate() functionality to the zar data store which let you identify which node it was on, although this could easily live as a helper function in our scheduler - e.g. locate(data_store, key).

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/518?email_source=notifications&email_token=AAFLYQT4CP6EMRZORMR54H3QU4EWNA5CNFSM4JPJKH6KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEE34VBA#issuecomment-557304452, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFLYQRWRRXNU6HE6TVC3NTQU4EWNANCNFSM4JPJKH6A .

--

Alistair Miles Head of Epidemiological Informatics Centre for Genomics and Global Health Big Data Institute Li Ka Shing Centre for Health Information and Discovery University of Oxford Old Road Campus Headington Oxford OX3 7LF United Kingdom Phone: +44 (0)1865 743596 or +44 (0)7866 541624 Email: [email protected] Web: http://a http://purl.org/net/alimanlimanfoo.github.io/ Twitter: @alimanfoo https://twitter.com/alimanfoo

Please feel free to resend your email and/or contact me by other means if you need an urgent reply.

alimanfoo avatar Nov 21 '19 22:11 alimanfoo

@David-Baddeley : I'll look into your pre-print, but as a follow-on to this discussion you might also be interested in tracking https://forum.image.sc/t/next-generation-file-formats-for-bioimaging/31361 All the best, ~Josh

joshmoore avatar Nov 22 '19 14:11 joshmoore

@David-Baddeley I highly recommend taking a careful look at dask. For me, it's made processing enormous microscopy datasets much much easier. You can see some applied examples here and here

For your specific issue (accessing individual chunks of a zarr array): The map_blocks method of a dask array offers distributed access to chunks for the purposes of applying a function that returns a numpy array (e.g., filtering or transforming data). For more generic access to array chunks, you can use the blocks property of a dask array, e.g. this example. Even more generically, you can explicitly slice a dask array along chunk boundaries to create a collection of independent sub-arrays that can be processed in parallel with generic tools in the dask library, e.g. dask.delayed or dask.bag.

I hope this is helpful!

d-v-b avatar Nov 26 '19 19:11 d-v-b

I think @joshmoore’s point is people may want to solve this in other languages (like Java and C/C++ to name a couple). A proper API for this kind of functionality would be a good answer for them (as opposed to use this Python library 😉). That said, I completely agree that in Python Dask eliminates this need. 😀

Would expect even in our current (Python) implementation we have something like this already. So it is just a question of exposing that to the user. Seems reasonable to me at least.

Hopefully this helps bridge the gap between these two perspectives. 🙂

jakirkham avatar Nov 26 '19 19:11 jakirkham

Would expect even in our current (Python) implementation we have something like this already. So it is just a question of exposing that to the user. Seems reasonable to me at least.

Sounds like a good idea. How would this work in practice? An iterator that yields (slice, thingy) where thingy is a lazy reference to the sub-array bounded by slice?

d-v-b avatar Nov 26 '19 19:11 d-v-b

Ah I guess that really only applies to the first half of the ask namely get/set for chunks. Sorry for the lack of clarity. It should be easy (I think) for users to iterate over all chunks on their own (like using np.ndindex with .nchunks).

jakirkham avatar Nov 26 '19 20:11 jakirkham

Sorry for the silence, it's been a mad couple of weeks. The more I look at it, the more I think that dask interoperability is a worthy goal. That said, I don't think just using dask solves all my use cases though (one example limitation would seem to be data-locality which dask would seem to respect for temporary data during computation, but not for data in storage at the start or end of processing).

Even when using dask, adding a low-level chunk access api has the potential to speed things up. When reading a chunk from zarr, dask currently slices the zarr array with it's preferred chunk size. Within zarr this involves the allocation of a new array, the loading of the zar chunks corresponding to the area requested by dask (which might not correspond to the underlying zarr chunks), and coping the relevant bits across into the empty array.

My ideal interface would comprise the following:

  1. a lazy chunk reference object which is serialisable (ideally as .json or similar rather than pickle)
  2. a get_chunk() function returning the above
  3. (very, very) optionally an iterator over chunks. As @jakirkham says it would be pretty easy for users to implement this in minimal code, depending on whether the index passed to get_chunk() was a single index, or an n-tuple describing the chunk index along each direction.

David-Baddeley avatar Dec 06 '19 22:12 David-Baddeley

Hi @David-Baddeley, thanks for following up.

It would be pretty straightforward to turn the Array._chunk_getitem() method into a public method, and also make some of the arguments optional. So, e.g., assuming we renamed this method to "get_chunk", you could then call myarray.get_chunk(coords) where coords is a tuple of integers and be returned a numpy array with the loaded chunk data.

I'm a bit hesitant to get into implementing something that returns lazy chunk reference objects which are also serialisable as json. Would you be able to explain why you need that?

alimanfoo avatar Dec 09 '19 17:12 alimanfoo

@David-Baddeley - you use case sounds fascinating and remarkably similar to how we want to work with climate data. I really encourage you to play a bit more with dask before assuming that it won't work for you. The slicing you refer to, for example, is 100% lazy and doesn't require moving data to a "controlling node."

If you want to see what distributed, parallel processing with dask in the cloud looks like, give this binder a try: https://binder.pangeo.io/v2/gh/pangeo-data/pangeo-ocean-examples/master?filepath=aviso_sea-surface-height.ipynb

Distributed computing is complicated. It's nice to be part of the dask community, rather than trying to roll these tools on our own. Our project has gotten much farther, and is more sustainable, because of this.

That said, I am 100% in favor of any optimizations we can find that will make zarr and dask work [even] better together.

rabernat avatar Dec 09 '19 17:12 rabernat

+1 for this feature. I am a dask user, and second the sentiment of using dask for complex array computations over chunks.

However, I have use cases where I would like to just load zarr chunks where performance is important. I would prefer to just remove the dask layer entirely, since I would just be doing a.blocks[i, j].compute() on random blocks. For now, I'm manually computing chunk boundaries to accomplish this. It's perfect fine, but it would be nice to have a .blocks API like dask does.

amatsukawa avatar Mar 13 '20 23:03 amatsukawa

Thanks @amatsukawa for commenting. FWIW I'd still be happy for such a function which returns a specific chunk as a numpy array, given chunk indices. Happy for that to go ahead if it's useful, would be a relatively small change.

alimanfoo avatar Apr 06 '20 13:04 alimanfoo

Hi, i think chunk level access for imaging would be interesting as well. As an example, if you are stitching two large volumes together with a certain point of overlap. It may be nice to do some registration and correlation across only a subset of chunks or even to use a window from one chunk as a kernel to slide over the other image. So I think there's some great opportunity here. Dask does do a great job if you want to work across the whole set or if you want to index. It may be nicer to have a chunk level without indexing and going over boundaries.

Sh4zKh4n avatar Jul 16 '20 07:07 Sh4zKh4n

Please note that a chunk level API will help in streaming applications as well as illustrated in the code in #595

skgbanga avatar Sep 02 '20 13:09 skgbanga

Just came looking for help docs and landed on this issue. This would be extremely beneficial.

Till an API of this nature is merged, is there a workaround?

srib avatar Jan 27 '21 21:01 srib

Yep I think we are just looking for a volunteer to do this work. It's not hard, but someone would need to pick it up 🙂

jakirkham avatar Jan 27 '21 23:01 jakirkham

Another use case for direct chunk access: at one point, we'd like to stack multiple arrays into a single one without loading all of it into memory. With a chunk size of 1 in that direction, this can be trivially done by copying or moving files on disk. But if we want to use a different chunk size, that's no longer possible.

An API to read/write a chunk given an index would greatly help here. I suppose one workaround would be to read slices (assuming it's possible without reading the whole array) but I don't know what to do on the write side.

lnicola avatar Oct 06 '21 10:10 lnicola

Hi Folks,

We're pretty interested in direct chunk access in napari now. @d-v-b had pointed out some performance issue with large images that contain a large number of chunks. That performance issue became very apparent in some recent work: https://github.com/napari/napari/issues/5561#issuecomment-1555211176. The rendering implementation in this demo requires efficient access when there are large numbers of chunks, but does not need the computation graph features from Dask.

I would be happy to pick up this work to provide access to chunks. It sounds like we need:

  • zarr.Array.get_chunk(self, coords) (is this implementation too naive? https://github.com/kephale/napari/blob/88c910bac83b756b5520d2aa199d51a130ff608f/napari/experimental/_progressive_loading_datasets.py#L184-L192)
  • zarr.Array.set_chunk(self, chunk_coords, value)

kephale avatar May 22 '23 14:05 kephale

@kephale - it sounds like you might be interested in the feature proposed in https://github.com/zarr-developers/zarr-python/issues/1398?

rabernat avatar May 22 '23 14:05 rabernat

@rabernat https://github.com/zarr-developers/zarr-python/issues/1398 does look interesting, but it is a little complex. Would the batch abstraction for encoding be a requirement for adding get_chunk and set_chunk to the API?

Maybe it is short-sighted, but my main use case is just that I need constant time access to chunks in arrays with large numbers of chunks. Dask doesn't scale well for these arrays with many chunks, and the chunk sizes are determined by the visualization needs and cannot be changed. I'm just playing around with monkey patching the chunk access API for now, but since I saw a few requests for chunk-level API in this thread I thought I'd offer to do the work upstream.

kephale avatar May 24 '23 11:05 kephale

... my main use case is just that I need constant time access to chunks in arrays with large numbers of chunks.

Ah ok that's a helpful clarification. But I guess I don't really understand where the current API falls short. Why is

array.get_chunk(chunk_coords)

better / faster than

array[chunk_index]

?

rabernat avatar May 24 '23 12:05 rabernat

I stand corrected. I haven't done precise measurements, but I get visually similar behavior by using array[chunk_index]. My mistake was that after running into performance issues with Dask I was suspicious of overhead from indexing/selection and went directly to _chunk_getitems.

Thank you for the help! This is being used to improve visualization of very large multiscale zarrs in napari, which currently looks like this https://github.com/napari/napari/issues/5561#issuecomment-1555211176.

kephale avatar May 24 '23 19:05 kephale

Since I am on record recommending dask earlier in this issue (ca. 2019), I should add that my earlier answers concerned using dask for batched chunkwise array access, e.g. applying some transformation to a chunked array.

I do not recommend using dask for visualization of chunked arrays. In this application, you want cheap (i.e., O(constant)) random access to individual chunks, but dask can't do this (or, at least it couldn't the last time I checked).

d-v-b avatar May 24 '23 20:05 d-v-b