zarr-python
zarr-python copied to clipboard
Sharding Prototype I: implementation as translating Store
This PR is for an early prototype of sharding support, as described in the corresponding issue #877. It serves mainly to discuss the overall implementation approach for sharding. This PR is not (yet) meant to be merged.
This prototype
- allows to specify shards as the number of chunks that should be contained in a shard (e.g. using
arr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), …)
). One shard corresponds to one storage key, but can contain multiple chunks: - ensures that this setting is persisted in the
.zarray
config and loaded when opening an array again, adding two entries:-
"shard_format": "indexed"
specifies the binary format of the shards and allows to extend sharding with other formats later -
"shards": [2, 2]
specifies how many chunks are contained in a shard,
-
- adds a
IndexedShardedStore
class that is used to wrap the chunk-store when sharding is enabled. This store handles the grouping of multiple chunks to one shard and transparently reads and writes them via the inner store in a binary format which is specified below. The original store API does not need to be adapted, it just stores shards instead of chunks, which are translated back to chunks by theIndexedShardedStore
. - adds a small script
chunking_test.py
for demonstration purposes, this is not meant to be merged but servers to illustrate the changes.
The currently implemented file format is still up for discussion. It implements "Format 2" @jbms describes in https://github.com/zarr-developers/zarr-python/pull/876#issuecomment-973462279.
Chunks are written successively in a shard (unused space between them is allowed), followed by an index referencing them.
The index holding an offset, length
pair of little-endian uint64 per chunk, the chunks-order in the index is row-major (C) order,
e.g. for (2, 2) chunks per shard an index would look like:
| chunk (0, 0) | chunk (0, 1) | chunk (1, 0) | chunk (1, 1) |
| offset | length | offset | length | offset | length | offset | length |
| uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 |
Empty chunks are denoted by setting both offset and length to 2^64 - 1
. All the index always has the full shape of all possible chunks per shard, even if they are outside of the array size.
For the default order of the actual chunk-content in a shard I'd propose to use Morton order, but this can easily be changed and customized, since any order can be read.
If the overall direction of this PR is pursued, the following steps (and possibly more) are missing:
- Functionality
- [ ] Use a default write-order (Morton) of chunks in a shard and allow customization
- [ ] Support deletion in the
ShardedStore
- [ ] Group chunk-wise operations in
Array
where possible (e.g. indigest
&_resize_nosync
) - [ ] Consider locking mechanisms to guard against concurrency issues within a shard
- [ ] Allow partial reads and writes when the wrapped store supports them
- [ ] Add support for prefixes before the chunk-dimensions in the storage key, e.g. for arrays that are contained in a group
- [ ] Add warnings for inefficient reads/writes (might be configured)
- [ ] Maybe use the new partial read method on the Store also for the current PartialReadBuffer usage (to detect if this is possible and reading via it)
- Tests
- [ ] Add unit tests and/or doctests in docstrings
- [ ] Test coverage is 100% (Codecov passes)
- Documentation
- [ ] also document optional optimization possibilities on the
Store
orBaseStore
class, such asgetitems
or partial reads - [ ] Add docstrings and API docs for any new/modified user-facing classes and functions
- [ ] New/modified features documented in docs/tutorial.rst
- [ ] Changes documented in docs/release.rst
- [ ] also document optional optimization possibilities on the
changed 2021-12-07: added file format description and updated TODOs
Hello @jstriebel! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
- In the file
chunking_test.py
:
Line 36:45: E211 whitespace before '[' Line 44:101: E501 line too long (104 > 100 characters) Line 47:101: E501 line too long (115 > 100 characters)
Comment last updated at 2021-12-22 11:34:19 UTC
Codecov Report
Merging #876 (2d1fea0) into master (b80c0c4) will decrease coverage by
0.06%
. The diff coverage is96.00%
.
@@ Coverage Diff @@
## master #876 +/- ##
==========================================
- Coverage 99.94% 99.88% -0.07%
==========================================
Files 32 34 +2
Lines 11216 11382 +166
==========================================
+ Hits 11210 11369 +159
- Misses 6 13 +7
Impacted Files | Coverage Δ | |
---|---|---|
zarr/_storage/store.py | 100.00% <ø> (ø) |
|
zarr/util.py | 98.95% <73.33%> (-1.05%) |
:arrow_down: |
zarr/_storage/sharded_store.py | 97.02% <97.02%> (ø) |
|
chunking_test.py | 100.00% <100.00%> (ø) |
|
zarr/core.py | 100.00% <100.00%> (ø) |
|
zarr/creation.py | 100.00% <100.00%> (ø) |
|
zarr/meta.py | 100.00% <100.00%> (ø) |
|
zarr/storage.py | 100.00% <100.00%> (ø) |
I think this is sorely-needed functionality. Can you please write some documentation of the data format and metadata, so that I don't have to infer it from the code?
Taking a quick look at the code, I see that there isn't much to do the data format --- you add a shards
key to the metadata and the shard file is just the concatenation of the individual chunks, which must be fixed size.
Naturally you will need an index in the shard to support variable-size chunks. I would suggest making the definition of that format a top priority, as it makes sense to decide on the format before worrying too much about the implementation.
I would also suggest supporting missing chunks within a shard.
Another thing to consider is putting the shard index in a separate file/key, and allow it to specify the key for individual chunks. That way you have some flexibility in which chunks are stored together. For example you can make the shard much larger that way. The downside is that there is a possibility of dead space and you might need to do a compaction step afterwards to reclaim that space.
If you do put the index in the shard file itself, I'd suggest making it a fixed size and at the end of the file. That way you can easily request it via a single HTTP range request from GCS or S3.
Based on Jeremy's comments, am almost wondering if we should have a MutableMapping
that describes the shards in a chunk as well. Then users can chose what that might be. It can also handle missing shards relatively easily.
Thinking of existing MutableMapping
s we support, this could be something like ZipStore
, DBMStore
, LMDBStore
, or SQLiteStore
.
Though maybe there are better options than these that we could also consider in the future. Basing sharding on a MutableMapping
interface, should allow for that flexibility.
First of all, the spec version will probably have to be bumped to version 3, since it is an incompatible change --- version 2 implementations that attempt to read the metadata will ignore the shards
key and will not work properly.
I think it would be very useful to fully-specify the format of the shards in the .zarray
metadata file itself, at least in the common case. Otherwise you have to have some out-of-band way to indicate the shard format as well, which is inconvenient and leads to implementation divergence (i.e. same issues as occurred with dimension_separator
previously).
There is another consideration regarding making the shard format a mutable mapping: for reading, you need to support random access, and mutable mapping is a reasonable interface for reading, though for efficiency you will want to cache the shard index content, to avoid having to re-read it for every chunk individually. For writing, though, you need an interface where you stream out a sequence of chunks, then finalize it by writing the index. I guess that kind of fits the MutableMapping interface, except that you need to somehow handle the finalize step. If you did use a format like LMDB or Sqlite for the shard, and are storing the shard on a local filesystem or other storage system that allows writes to arbitrary byte ranges, then you could indeed just support the MutableMapping interface without need for a finalize step. But that would not work for S3, GCS and similar stores that are high latency and don't allow writes to existing files, and those stores were a key motivation for sharing in the first place.
I think an important question regarding allowing pluggable shard formats is what the benefit would be. Essentially it is already possible via a custom store
to implement sharding. The key advantage of this proposal is that the sharding is directly specified in the .zarray
file, which allows for standardization.
As a simple starting point, here are two possible shard formats:
Format 1 (chunks are in fixed order):
- chunks stored in fixed order (e.g. lexicographical by chunk position)
- chunk index, which consists of
prod(shards)
uint64le values (8 * prod(shards)
bytes) specifying the size in bytes of each chunk. The special value of2^64-1
indicates a missing chunk. You compute the cumulative sum (skipping missing chunks) to determine the starting and ending offsets of each chunk.
Format 2 (chunks are in arbitrary order):
- chunks stored in any arbitrary order, empty space is also allowed
- chunk index, which consists of
prod(shards) * 2
uint64le values (16 * prod(shards)
bytes) specifying the starting and ending offset of each chunk. The special start/end pair(2^64-1, 2^64-1)
indicates a missing chunk.
With either format, to read a chunk you would first read the entire chunk index, then read the appropriate byte range for the chunk. When reading from an HTTP store, this would mean 1 byte range request for the index, plus one byte range request for the chunk.
Zip format would work fairly similarly to format 2, except that you need one more byte range request to first read the End of Central Directory record.
I guess so far zarr itself has avoided specifying any binary formats --- only the .zarray
metadata file itself has a specified format, the format of individual chunks are determined entirely by the codec. I can see therefore the reluctance to specify a binary format for shards. But I think this feature would be much more valuable with a defined format.
On immutable object stores, Jeremy, you might be interested in reading issue ( https://github.com/zarr-developers/zarr-specs/issues/82 )
Thanks everybody for the feedback and sorry for the late response! I just pushed one more commit that makes the actual shard-format configurable by adding e.g. "shard_format": "morton_order"
to the .zarray
json or setting shard_format
during construction. Also I added a bitmask for the uncompressed chunks, just to showcase how some binary metadata can be used in the shard.
I'll try to go through the threads and summarize different decisions we should make along the way and add my opinion:
@joshmoore
add interface to specify different backends for sharding
Totally agree! What do you think about some additional string and/or config for the shard-format, which might be expanded later-on, e.g. as I added in 7e2768a? I agree that it might make sense to allow different formats here, it should especially be future proof for new ones. This could be something like blosc2, zip, sqlite, … The only requirement I see is that it would need to be backed by the normal underlying store.
Re: index-format @jbms @jakirkham
I would suggest making the definition of that format a top priority, as it makes sense to decide on the format before worrying too much about the implementation.
Agreed :+1:
wondering if we should have a MutableMapping that describes the shards in a chunk as well. Thinking of existing MutableMappings we support, this could be something like ZipStore, DBMStore, LMDBStore, or SQLiteStore.
I'm wondering how the current implementations would then be mapped to the storage underneath. I'd assume that we would need to rewrite most of the logic, since those mutable mappings don't operate on binary blobs which we need for the interface of the underlying storage. But I agree, zips or DBs might also be an option, see my proposal about the shard_format
key above. I think for now a simple index together with the chunks might be the easiest option, but anything along those lines could be easily added later without breaking compatibility.
As a simple starting point, here are two possible shard formats:
Format 1 (chunks are in fixed order):
- chunks stored in fixed order (e.g. lexicographical by chunk position)
- chunk index, which consists of
prod(shards)
uint64le values (8 * prod(shards)
bytes) specifying the size in bytes of each chunk. The special value of2^64-1
indicates a missing chunk. You compute the cumulative sum (skipping missing chunks) to determine the starting and ending offsets of each chunk.Format 2 (chunks are in arbitrary order):
- chunks stored in any arbitrary order, empty space is also allowed
- chunk index, which consists of
prod(shards) * 2
uint64le values (16 * prod(shards)
bytes) specifying the starting and ending offset of each chunk. The special start/end pair(2^64-1, 2^64-1)
indicates a missing chunk.
Great question. In the second format with arbitrary order, a reader would not need to know anything about the semantic order the chunks were written in, and strategies such as "append-only" would work. Fixed order would shorten the index a bit, and would also guarantee some properties, such as no empty space and the order of the chunks themselves. Especially for the latter I tend towards the fixed-order format, since Morton order gives advantages when reading neighborhoods of chunks in a shard. I'm also wondering if we should make the order itself configurable in this case, or if Morton order is fine for now. Anything else might still be added later via the shard_format
string.
For fixed-length chunks (uncompressed and not of dtype object) I would propose to have a simple bitmask to indicate if a chunk is missing, e.g. as implemented in 7e2768a. Another question I was wondering about here: If chunks are fixed-length, should we still write missing chunks in a shard? This would allow that partial writes of any chunk are possible later without rewriting the whole file (just the chunk and the bitmask itself).
One more question that came up: Should the index/bitmask be part of the file or at its start or end? I'd integrate it into the shard-file directly, since it's one file-handle/entry less per shard and (I think) does not bring any downsides (partial reads / writes would still be necessary to read/write subchunks of a shard). I'm not sure about the implications for putting the index at the start vs end of the file for http-range requests. Putting it at the end would help when streaming the compressed chunks while still compressing latter chunks, so that the index can be computed and written asynchronously at the end, as @jbms wrote already. That's nothing I'd aim for at this stage in the implementation yet, but might be something we want to add later (or is useful for other implementations). I was wondering if putting the index at the start of the file brings benefits for reads on disk and possibly more services, since the cache might rather populate "forward"? Just guessing here, tbh.
To summarize the decisions about the index format:
- Do we want to use a simple custom binary format vs sth. like zip? (Assuming yes atm for the other questions.)
- Should the order of the chunks be fixed? This determines if we need to store only the chunk-size or also the start-position of each chunk. If yes, which order (probably Morton)?
- Should we use a simple bitmask in the case of fixed-length chunks? This would trade a little complexity against index-space.
- For fixed-length chunks, should we still write non-existing chunks to allow partial writes to new chunks?
- Should the index/bitmask be at outside or the start or at the end of the file?
@jbms
the spec version will probably have to be bumped to version 3, since it is an incompatible change --- version 2 implementations that attempt to read the metadata will ignore the shards key and will not work properly.
Not sure what incompatible means in this case, but new clients can read all files, just old clients cannot read arrays in the chunk-format anymore. In terms of semantic versioning, this would be a minor version bump in the software, which would probably be a bump the the spec version here.
As far as Morton order, whether it is advantageous depends on the access pattern. There are also many variations possible, e.g. how the dimensions are ordered, whether some dimensions should be kept in lexicographic order as inner or outer dimensions. If the index does not require a fixed order, then it is possible to later add a metadata field that indicates a preferred order for writing without breaking compatibility.
I am inclined to think that having a separate index format in the case of fixed-size uncompressed chunks to save 8-16 bytes per chunk is not worth the added complexity.
A separate index file makes it possible to append chunks without retaining a wasted copy of the index, and also allows you to compress the index without needing an additional read to determine the size of the index. But it also makes it impossible to safely modify a shard using atomic single-file writes.
Firstly, I'd just like to echo @jbms in saying that this is sorely-needed functionality. It's exciting to see thought and real effort being put into chunk localization.
I would however just like to remind everyone of the discussion in #515 as it has been a while. Specifically, I'd like to draw attention to the historical differences in technical approaches to addressing use cases between Zarr and other efforts like TileDB. That is, Zarr has a history of taking a path that is simple, relatively easy to understand, and quick to implement. This allows the Zarr runtime to be, in comparison to something like TileDB, very lightweight. In my opinion, it has also allowed Zarr to be able to quite nicely balance read-heavy as well as write-heavy use cases when using object storage in particular.
I think everyone here understands the draw of localizing chunks in a single file or object for read-heavy use cases. However, the complexity really starts to ramp up when we start considering writing. This is especially true for object storage where, as @jbms and @jakirkham have already mentioned, objects are immutable. In the functionality section of this PRs description as well as throughout the comments at least the following has been touched upon:
- Grouping of reads and writes
- Partial reads and writes
- Sparseness
- Metadata format
- "Shard" format
- Synchronization
- "Time travel" (zarr-developers/zarr-specs#82)
- Storage order
There is substantial overlap between these complex software engineering concerns and the concerns of a database project or a storage engine project like TileDB. TileDB's Data Format in particular addresses:
- Sparse and dense fragments
- Immutability
- Storage order
- Metadata format
Its runtime also specifically addresses synchronization and time travel, and is a permissively (MIT) licensed project with excellent Python bindings.
Instead of attempting to design and implement many of the aforementioned features in the Zarr core would it be worthwhile evaluating a TileDB based storage implementation as an alternative?
I agree with @chris-allan that zarr has focused on simplicity of the data format and implementation, and as that has served a class of use cases well it may well be wise to keep zarr aligned with that focus.
It is true that this discussion has touched on many different points, some of which surely depart quite far from the present simplicity of implementation and data format.
However, I think at the core of this proposal there is the possibility of a relatively simple design that has different trade-offs compared to TileDB and serves different use cases:
- Suppose we make shards immutable once written, with a simple binary format (defined by the zarr specification) where the (compressed) chunks may be written at arbitrary offsets, but the end of the file must contain
16 * number_of_chunks
bytes that specify the offset and length of each chunk within the shard. - Then writing, even partial writing, is essentially very similar to the existing write support in zarr, except that it is done at the granularity of entire shards rather than chunks.
- Reading is a bit different as it requires support from the underlying store to read byte ranges rather the entire value.
This design allows efficient concurrent writes as long as they are shard aligned. It also allows efficient concurrent reads. The file format is still quite simple, and the only addition to the .zarray
metadata format is the shards
member.
My understanding of TileDB is that it maintains a single index, which is ordered first by time-of-write, and then spatially. That makes concurrent writing efficient and relatively simple to implement, and also naturally supports versioning. However, for reading, it is necessary to read at least the bounds of every fragment, as there is no top-level spatial index. That works fine if you have a small number of fragments, or a small number of long-lived readers that can keep a spatial index cached in memory.
However, in the case where you have a large number of fragments, and a large number of possibly short-lived readers, there is a problem. For example, suppose we have written a 1PB dataset as 1 million fragments of 1GB each.
- If we then wish to read from this dataset from 10000 worker machines in parallel, each of the 10000 machine will have to read the bounds for all 1 million fragments, a total of 10 billion read requests, just to figure out which fragments actually need to be read.
- A viewer like Neuroglancer, that is directly accessing the storage via a web server, would likewise need to read the bounds of all 1 million fragments, which would take a large amount of time, before it could begin loading any data.
This limitation could be avoided by adding an additional spatial index to TileDB, but that would have its own set of tradeoffs.
Format 2 (chunks are in arbitrary order):
- chunks stored in any arbitrary order, empty space is also allowed
- chunk index, which consists of
prod(shards) * 2
uint64le values (16 * prod(shards)
bytes) specifying the starting and ending offset of each chunk. The special start/end pair(2^64-1, 2^64-1)
indicates a missing chunk.
Format 2 is amenable to quickly and simply writing compressed chunks in parallel.
My perspective is from creating software to write shards at data acquisition. Compressing the data and then writing the data may be faster than writing raw uncompressed data to local disk array. Also in this environment, the cost of creating many files on a local filesystem may be high and add unnecessary latency. Thus, shards are relevant to help mitigate that cost. The actual size of the shard file on disk is a secondary priority to storing the data as quickly to disk as possible. The shard file could be repacked to eliminate unused space at a later time or at a slower rate than acquisition speeds. Alternatively repacking may occur while transferring the data off the acquisition system. A format that allows for empty space supports writing compressed chunks in parallel to a shard as quickly as possible would be useful at data acquisition since data write speed may be more important than file compactness.
One scheme to compress and write the chunks parallel is as follows. The mechanics of this scheme involve receiving a pointer to contiguous data from an acquisition device (e.g. a camera), chunking that data, and compressing the data in parallel across multiple threads. A large shard file may be quickly created by opening the file, seeking to an upper bound of the potential file size, and writing a small amount of data. To allow parallel writes to a single shard file, the file is memory mapped, and the chunks are copied to predetermined locations in the mapped memory so that the parallel threads do not need to communicate. These predetermined locations are calculated based on the uncompressed size of the data with potential overhead. Thus, there may be empty space between compressed chunks. Memory mapping is useful here since memory page files are often lazily allocated by the operating system and are flushed to disk in an asynchronous manner. Memory mapping is also a useful generic abstraction so that operating system specific APIs for asynchronous unbuffered writes are not necessary. In summary, the above scheme allows for independent parallel writes to a single shard using memory mapping, but requires the potential for empty space in the file.
There are optimization details for the above scheme, but the lack of need of synchronization across threads is a major advantage and allows for simple concurrent programming. Each thread just requires a set of input and output buffers, one or more of which may be mapped to a file. Buffer to buffer APIs are provided by many compression libraries. Additionally this scheme resembles that used to write chunks to separate files in situations where the cost of file creation is low or less of a priority.
Being able to write data efficiently to a Zarr compatible format at data acquisition would eliminate the need for intermediate formats.
It would be great if the zarr specification owners weigh in on this. I think a sharded format would be tremendously useful and it would be great to have it ASAP.
I would suggest we go with "format 2", which @mkitti also seems to find satisfactory.
For zarr python we will need to add byte range reading support to the mutable mapping API, perhaps via a new optional method. For mutable mappings that don't support this new method, it can be (inefficiently) emulated by doing a full read and then throwing out the unneeded bytes. Or alternatively it could be handled at a higher level by just reading at full shard granularity.
It would be really great if we can reach agreement on the format and try to proceed with the implementation quickly.
I can implement support in Neuroglancer and TensorStore; given the existing support for the neuroglancer precomputed sharded format that will not be too difficult.
I agree to go forward with "format 2". I think it is powerful enough to solve all the current needs for storage, visualization (e.g. neuroglancer and webKnossos) and processing of very large datasets, which were the primary goals of the sharding feature. I think the format is also simple enough to be widely implemented and fits well with the zarr philosophy.
Thanks for the input! Going forward with "Format 2" sounds great! I just added a commit to support this format:
The (possibly compressed) chunks are currently written in random order, followed by an index referencing them. The index is holding an offset, length
pair of ~~big~~little-endian uint64 per chunk, the chunk-order in the index is row-major (C) order.
For (2, 2) chunks per shard an index would look like:
| chunk (0, 0) | chunk (0, 1) | chunk (1, 0) | chunk (1, 1) |
| offset | length | offset | length | offset | length | offset | length |
| uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 |
Fow now, I left the shard_format
key in the .zarray
specification and called the format indexed
. For the default order of the actual chunk-content in a shard I'd propose to use Morton order, but this can easily be changed and customized, since all orders can be read. I'd propose to make the write-order easily configurable in the API, but to leave it out of the array spec in .zarray
, which allows for arbitrary customizations without the need of serializing this setting.
If we can agree on this or a varied format, the following steps would be missing for the zarr-python implementation:
- Use a default write-order (Morton) of chunks in a shard and allow customization
- Support deletion in the
ShardedStore
- Group chunk-wise operations in
Array
where possible (e.g. indigest
&_resize_nosync
) - Consider locking mechanisms to guard against concurrency issues within a shard
- Allow partial reads and writes when the wrapped store supports them
- Add support for prefixes before the chunk-dimensions in the storage key, e.g. for arrays that are contained in a group
- Add warnings for inefficient reads/writes (might be configured)
- Maybe use the new partial read method on the Store also for the current PartialReadBuffer usage (to detect if this is possible and reading via it)
- Tests
- Documentation
I also updated the PR description in the top accordingly.
What would be the process for the next steps besides the implementation?
If we've settled our collective intent to be solely addressing the read-optimized, chunk-localized use case I will definitely table the TileDB discussion in favour of not creating additional noise here. If anyone is interested in looking into it I'd be happy to explore it on another issue.
"Format 2" certainly seems to have some consensus and I'll add my +1. Perhaps the two objects might have zbin
and zidx
suffixes? Is there a preference for a single object? It would also open the door for adding zhash
or zpar
extensions to the format going forward for those who might be interested in integrity hash and/or data recovery redundancy parity functionality.
Of the very large data set use cases already mentioned here I'll add a general one: moving and deleting the many files that make up a Zarr based imaging data set can be a real pain under a lot of circumstances. Users of even not so large imaging data sets already feel this pain with Zarr and they do not feel it with traditional single file, proprietary or not, file formats. I believe @normanrz has already referenced the PR on at least one forum.image.sc thread with concerns surrounding exactly this.
Furthermore, as @mkitti has already outlined, chunk-localization is of high interest to the instrumentation community. That community is incredibly diverse and often operates very close to the metal because it has to in order to achieve the write speeds required to not have acquisition hardware waiting on I/O. They would not be making mention of memory mapped and zero-copy I/O, operating system semantics, or repacking otherwise. I would argue that Zarr's current design is predicated on not having the user be even tangentially concerned about any of these things. This is especially true when object storage is in use and half of them aren't even possible.
Consequently, with respect to the zarr-python API, I think I'd move forward with caution and focus on very high quality documentation and extremely well defined semantics surrounding this functionality. It's going to be very tempting to have leaky abstractions to allow for very high performance, operating system specific or storage subsystem specific use cases and I don't think that is a good idea. If, as @jbms has been focusing on, we get the format right and that format continues in the Zarr spirit of simplicity then I think that gives freedom for those types of users to write to the format as they see fit.
One might even argue that the API additions be read-only, at least initially, as reconciling a Zarr user's current expectation of chunk aligned writes vs. shard aligned writes may be very difficult. I expect we can all agree that if we're not careful, as chunk-localization is very attractive to a lot of people, we will be presented with a significant number of users with write performance horror stories and that will have a real detrimental effect on the community perception of Zarr.
It would be great if the zarr specification owners weigh in on this.
I just read through this long and thoughtful thread. Thanks to everyone participating. I am :+1: on format 2.
One might even argue that the API additions be read-only, at least initially, as reconciling a Zarr user's current expectation of chunk aligned writes vs. shard aligned writes may be very difficult.
Geoscience Use Cases
From the perspective of geospatial data, our use cases tend to be "write once, read many." The data generation process can almost always be tuned to write shards (or what we currently call chunks) contiguously, e.g. one shard per task. The main performance benefits from sharding will be for data users who want to make reads at the sub-shard level--currently they have to read the whole V2 chunk, but this new feature will enable much more granular access. So I would personally be fine with our first releases of this new feature simply punting on supporting parallel writes to a single shard.
Optimizing Reads
To optimize reads, particularly on high-latency stores like S3, it will be key to implement intelligent fusion of contiguous chunks into a single read operation. For a given shard array slice (e.g. 0:20, 100:200
), we would need to:
- Figure out which chunks intersect with the slice (N chunks)
- Look up the byte ranges of those N chunks from the index
- Perform some kind of optimization to fuse the reads into a smaller number (< N) of range requests. Depending on the latency / concurrency / throughput profile of the storage system, it may even be optimal to read more chunks than are strictly needed in order to reduce the number of requests. I assume folks here (e.g. @jbms in Neuroglancer) have solved some version of this problem already.
Locking
Once we are ready to fully support concurrent writes to the sharded format, we may need to get more serious about locking. In the Pangeo Forge project, we have an internal class called a ChunkGrid which figures out the granular locking needed to write a source array with a particular chunk scheme to a target array with a different chunk scheme. That could be pulled out and turned into a standalone thing that would be useful here.
Great to see that consensus is building around "format 2".
Regarding the description by @jstriebel of the format in the pull request, I would suggest a couple minor changes:
- Use little endian rather than big endian for the index. Endian conversion of the index should be an insignificant cost in any case, but the vast majority of processors these days operate in little endian mode, and I expected big endian to only become rarer in the future, so it seems like the better choice.
- Clarify that the shard index is always full size, i.e. contains
prod(shards) * 16
bytes, regardless of how much of the shard may be out-of-bounds of theshape
specified in the metadata. - Clarify that missing chunks are indicated by the special offset/length pair
2^64-1, 2^64-1
.
Regarding the comment by @chris-allan as far as whether to use separate files for the shard data and index, i.e. .zbin
and .zidx
, I think it is advantageous to use a single file for both, with the index at the end:
- Many key-value storage systems support atomic operations on a single key but not on multiple keys. By using single-file atomic operations, you can ensure that the format remains consistent. Otherwise, you will have to update either the data or index file first, and if the update is interrupted the zarr array will end up in an inconsistent state.
- For example, local filesystems allow for atomic rename to replace a single file, and you can also use filesystem locking to safely manage multiple concurrent writers.
- Google Cloud Storage and Azure Blobstore support conditional reads and conditional writes, which allow for atomic read-modify-write of individual files but not multiple files.
- S3 supports conditional reads but not conditional writes.
- When reading, you will first need to read the shard index and then read the relevant portion of the shard. By making the second read of the data conditional of the generation seen when reading the shard index, you can ensure that you read consistent data.
For neuroglancer precomputed there was an initial sharded format that used separate index and data files, but I later changed it to use a single file. There are other formats that would allow the data and index to be in separate file while retaining the ability to safely perform atomic read-modify-write operations of shards using only single-file atomic operations, but they would add additional complexity.
Regarding the comment by @rabernat as far as fusing multiple byte range reads:
The array class itself could coalesce ranges that are actually adjacent, but if there are gaps then some heuristics will be needed, and I think that might best be handled by having the mutable mapping API allow multiple byte ranges to be requested at once. Then the underlying store can decide how best to satisfy that request. For example, the HTTP spec allows multiple byte ranges to be requested at once via the Range
request header, though unfortunately not all web servers support that, and in particular S3 and GCS do not. I did experiment with coalescing of byte range requests in Neuroglancer, but I found that it wasn't particularly effective for the request patterns used by Neuroglancer, and did not actually enable it. For batch processing there is likely to be more opportunity for coalescing reads. There are some papers that describe heuristic algorithms for coalescing reads implemented in HDF5. Note that high latency doesn't necessarily mean that requests must be coalesced for good performance, since you can still issue multiple requests in parallel. In fact my experience with S3 is that the throughput of a single connection may be limited, and that higher throughput is achieved by issuing multiple smaller requests in parallel. Of course that leads to higher operation charges, though.
Parallel writes to a single shard would indeed be a very advanced use case, analogous to parallel writes to a single chunk under the current version of zarr, and I would agree that we should not worry about that initially. Users should choose the shard shape, just as they choose the chunk shape, to allow each worker to write to disjoint shards.
Thanks everyone! I'd also simply use locking for parallel writes to a single shard, as optimizing for actual parallel writes depends on the exact use-case and goes beyond a first support of sharding. If I recall this correctly, parallel writes to a single chunk are also just handled via locking atm.
Regarding coalescing byte ranges: I'd also defer this to the store, which can implement an appropriate strategy, by passing on all necessary ranges in one call, similar as getitems
can optimize multiple chunk-reads depending on the store.
@jbms Thanks for the suggestions! I chose big-endian to force developers to specify endianness, but I'll change it to little endian happily. Using 2^64-1, 2^64-1
for missing chunks is way better than my initial 0
for the length, since an empty chunk might actually be a valid compression^^. Going to adapt this later.
Thanks, @jbms.
With respect to Google Cloud Storage and Azure Blob Storage "conditional reads and conditional writes" are you referring to the optimistic locking options (such as ifGenerationMatch
) available to get, and rewrite, as well as leases respectively?
I don't want to turn this thread into a discussion on how to perform advanced read/write techniques using specific storage technologies. Partly to try to keep on topic but also to be sensitive to the fact that zarr-python currently relies on storage technology abstractions, mostly filesystem-like or map-like interfaces, to function. Facilitating a great number of these more advanced techniques, even in part, would likely require an implementer to commit to developing Storage
implementations inside or outside the zarr-python core that are deeply aware of these semantics and able to efficiently leverage them. Certainly happy to theorycraft elsewhere however.
All multi-process locking in zarr-python currently happens through separate lock files and not file locks acquired on the chunk files. There are good reasons for this due to the semantics of the POSIX lock APIs in particular but this is probably not the place for that discussion either.
Where I'm coming from with the separate zbin
and zidx
objects, and maybe I'm just getting ahead of myself, is in terms of designing for parallel chunk writes to a single shard with low overhead concurrency control. Such a design would likely be predicated on the zbin
being append only. This is obviously not a new idea, is inspired by RCU and MVCC, and very traditional filesystem centric but consistent with zarr-python's aforementioned current lock file approach to locking: your lock file would likely just guard the zidx
. I accept there are dragons there however.
At least as I see it for a single object shard to contain both the chunks and the index where the concurrency expectation is that all operations on that single object are atomic de facto precludes its use for parallel chunk writing; the performance will be just far too unpredictable. It seems that everyone's fine with that though, especially for this first attempt and no worse conceptually than the current expectations surrounding parallel value writing to a single chunk.
@jstriebel: There is a reasonable locking discussion on #857. I think the summary is that it's fair to say that zarr-python's current synchronization functionality is quite limited and solely chunk based. I expect if you want offer locking of the shard as part of your proposed implementation a substantial refactor will be required.
@chris-allan Re: conditional reads and writes: Yes, I am referring to the ifGenerationMatch
and similar options. I was not proposing that zarr-python necessarily make use of optimistic concurrency, merely that the format be designed to be compatible with it, so that zarr-python or other implementations could in the future make use of that.
Your comment regarding supporting concurrent appending of chunks is interesting. It sounds like the idea would be that you have a lock server from which each writer can request byte ranges in the data file, and then after writing either each writer, or alternatively the lock server itself, could atomically update the index file. This would work with local filesystems, and some network filesystems, and would also work with GCS even without a lock server, since GCS supports atomic append via the "compose" operation. The downside is that if you want to replace data in a shard, rather than just appending new chunks, you cannot use atomic single-file operations to ensure a consistent state.
With GCS even the format with a combined data and index file could still support parallel appends, but you would end up with some space wasted on the old copies of the shard index. However, on normal filesystems that don't support atomic append, it wouldn't be possible to append while ensuring the format remains consistent at all times.
To get atomic consistent updates and support (parallel) appends, we could use a more complex format:
The shard index file <shard-key>
would have size prod(shards) * 24 bytes
, and for each chunk would specify:
uint64le data_file_id
uint64le offset
uint64le length
The chunk data would be located at the specified offset/length range of the file named <shard-key>.<data_file_id>
.
Writers could either append to an existing data file or create a new one. Data files not referenced by an index file can be deleted.
However, it is not clear that it would be wise to add the additional complexity of this format. Instead, it may be reasonable to initially use just a combined data/index file but introduce a new shard format in the future via the "shard_format" metadata field.
Re: conditional reads and writes: Yes, I am referring to the ifGenerationMatch and similar options. I was not proposing that zarr-python necessarily make use of optimistic concurrency, merely that the format be designed to be compatible with it, so that zarr-python or other implementations could in the future make use of that.
👍 Makes complete sense.
Your comment regarding supporting concurrent appending of chunks is interesting. It sounds like the idea would be that you have a lock server from which each writer can request byte ranges in the data file, and then after writing either each writer, or alternatively the lock server itself, could atomically update the index file. This would work with local filesystems, and some network filesystems, and would also work with GCS even without a lock server, since GCS supports atomic append via the "compose" operation. The downside is that if you want to replace data in a shard, rather than just appending new chunks, you cannot use atomic single-file operations to ensure a consistent state.
More or less, yes. There are lots of gotchas to the approach I'm sure and I've spent only a few hours thinking about it so I'm sure there are things I've not considered. It's also only really going to work if you can bake concurrency semantics into the specification; one misbehaving client and you're toast. My impression is that the Zarr project is not ready to start prescribing implementation semantics in the specification but maybe I'm wrong.
Undoubtedly, GCS has the highest number of features that would support such an approach. Unfortunately, certainly in the spheres my organization operates in, S3 is the marketshare leader by quite some margin.
However, it is not clear that it would be wise to add the additional complexity of this format. Instead, it may be reasonable to initially use just a combined data/index file but introduce a new shard format in the future via the "shard_format" metadata field.
Agreed. Once the simple case is done we can consider planning one that addresses some more advanced use cases with the required accompanying complexity in mind.
@chris-allan I think the issue of "misbehaving clients" is not too severe, because at least in the most common parallel writing use cases I would imagine (distributed processing pipelines), all writers are running the same software. Of course there may be some cases where parallel writing using different software may be desired.
A huge thanks to everyone for the contributions to the thread so far! There's likely enough content here that we may need to extract some of it into a dedicated design document. At the very least, I think this clearly shows the interest in having a solution somewhere between the current chunking strategy and "use a ZipFile". I also share @jbms' desire to get something into production (and therefore doubly appreciate his offers for implementing in tensorstore and neuroglancer :100:). But I'd like to suggest taking a brief step back.
Though there does seem to be general support for the v2 format, the foremost goal of this initial effort is to find the right abstraction for the format (or protocol if you prefer). One question that I think we should address is: what does the next implementation look like? Will the .zarray metadata be able to properly handle the adjustment? It might make sense to even start with N>1 implementations to keep the protocol honest, even if we don't roll them out to all implementations in production.
Even more broadly, what I've been quietly pondering while listening to all of you is what it means to reverse the abstraction. Zarr chunks are currently the "binary file unit". What changes in the proposal if we try to keep that definition? Chunks then become shards, and one must specify how they are internally structured into blocks. In this case, does sharding become just a codec?
Playing with this idea just a bit (not for performance but solely for abstraction): does that mean one could use a ZipFile for a single chunk and extend @andrewfulton9's work on partial reads. Trying to take this to the further (possibly absurd) extent: what about using a Zarr array (e.g. with a ZipStore) for a chunk?
It may well be that for each of these statements there's a clear, "here's why not" (and it seems that @jbms has already run into many of them!) But before we plunge ahead, I do think we want to avoid premature optimization, knowing that as @chris-allan has pointed out a few times, this comes with a good deal of burden across at least 13 implementations.
cc: @alimanfoo
Thank you @joshmoore for your comments:
Though there does seem to be general support for the v2 format, the foremost goal of this initial effort is to find the right abstraction for the format (or protocol if you prefer). One question that I think we should address is: what does the next implementation look like? Will the .zarray metadata be able to properly handle the adjustment? It might make sense to even start with N>1 implementations to keep the protocol honest, even if we don't roll them out to all implementations in production.
I'm not sure I fully understand what you are asking, but I think this concern is very much related to what I have been trying to address as far as carefully describing the format before worrying about the implementation. Once we have settled on a format, I can certainly create a test branch of Neuroglancer and a test deployment that can be tested against data created by the zarr-python implementation. As I mentioned I would suggest that zarr_format
be incremented since we are adding a new metadata field that must not be ignored.
Even more broadly, what I've been quietly pondering while listening to all of you is what it means to reverse the abstraction. Zarr chunks are currently the "binary file unit". What changes in the proposal if we try to keep that definition? Chunks then become shards, and one must specify how they are internally structured into blocks. In this case, does sharding become just a codec?
From the perspective of the .zarray
format itself it would kind of work to just consider sharding a special type of compressor
, e.g.:
"compressor": {"id": "sharded", "compressor": {"id": "blosc", ...}, "chunks": [10, 20, 30], "filters": ...}
Here the inner chunks
would specify the chunking within a shard, and the inner compressor
and filters
would specify the encoding of individual chunks within the shard.
Top-level filters
must not be used in this case.
For writing this representation works okay, I think, without any changes to the zarr core, other than the minor issue that top-level filters must not be specified.
For reading there would need to be support in the zarr core for partial reads (which would amount to a major change to the compressor and mutable mapping interfaces).
The upside is that partial read support could also be added to other existing codecs, like image codecs that support partial reads. The downside is that both the mutable mapping and compressor interfaces need to change, whereas with the metadata format proposed by @jstriebel we need only change the mutable mapping interface but not the compressor interface, and we don't end up with an unusable top-level filters
field. I can see the merits of both approaches, and while I am inclined towards the proposal by @jstriebel it doesn't really make that much difference to me.
Most likely when implementing support in Neuroglancer and TensorStore, if we went with specifying the sharding via the compressor
, I would end up special casing the sharded
compressor and implement it exactly the same as under the metadata proposal by @jstriebel.
Playing with this idea just a bit (not for performance but solely for abstraction): does that mean one could use a ZipFile for a single chunk and extend @andrewfulton9's work on partial reads. Trying to take this to the further (possibly absurd) extent: what about using a Zarr array (e.g. with a ZipStore) for a chunk?
Zip format is similar to the proposed "format 2". The upside is that it is a standard existing format that can be read and written by existing software. The downsides are:
- The zip format has a lot of complexity that isn't relevant here --- there is the original zip format plus zip64 extensions. There are also various compression and encryption options., and support for multiple disks.
- Partial reads are not as efficient as "format 2": you have to first locate the "End of central directory record". This is at the end of the zip file but is variable length! Therefore you actually have to read up to 65535+22 bytes at the end of the file and search from the end for a valid end of central directory record. Then you must also read the central directory file header and the zip64 central directory headers. Finally once you have read the regular and zip64 central directory you can read the actual data.
- Due to the complexity of the zip format I think "format 2" would actually be simpler to implement, and less likely to lead to incompatibilities between implementations, despite the fact that there are existing libraries that support the zip format.
In general we could imagine arbitrary nesting of zarr arrays --- but in practice I don't think it is useful to have more than two levels: granularity at which you can write, and granularity at which you can read.
Currently we have the index at the end of the file rather than the beginning of the file. If we had the index at the beginning of the shard, we might be able to use a jammed chunked HDF5 file as a valid shard.
https://portal.hdfgroup.org/display/HDF5/h5jam+-+h5unjam
A jammed HDF5 file is one with user data at the beginning of the file with the actual HDF5 file starting at some power of 2 byte offset greater than or equal to 512 (e.g. 512, 1024, 2048, 4096, etc).
Alternatively, a separate index zidx
might allow one to use a chunked HDF5 file as a valid shard as-is.
(I'm aware of https://medium.com/pangeo/cloud-performant-reading-of-netcdf4-hdf5-data-using-the-zarr-library-1a95c5c92314 so this is not a high priority.)
In the same vein, I would be interested to see if @aleixalcacer or @FrancescAlted thinks the current format could also be implemented as a Blosc2 metalayer: https://www.blosc.org/posts/blosc-metalayers/ . Perhaps it could even fit as an additional metalayer on top of Caterva.
@mkitti What do you see as the advantage of making the shard file also a valid HDF5 file? That would also require making the zarr compressors and filters available to HDF5, unless you are just going to use uncompressed chunks.
I just did a quick test and it appears that you can append additional bytes to an HDF5 file and they are ignored. Of course they will probably be clobbered if you write using the HDF5 library, but you would anyway need to update the index in that case.
I imagine that you are thinking about reusing, or being compatible with, an existing binary format in the context of creating a separate zarr implementation? Since we are talking about the format of an individual shard, not an entire array, even if the shard format happens to match some existing format like hdf5 or zip, it still would not likely be very useful to access it directly using existing software designed for those formats, because you still need to combine multiple shards to have a view of the entire array, and to apply zarr compressors and filters. Given the simplicity of "format 2", I think re-using an existing format and trying to use existing libraries to read/write it would actually be much more trouble than just implementing "format 2" directly.