zarr-python
zarr-python copied to clipboard
Initializing a group or array is not thread-safe, even with mode='w'
Zarr version
2.14.2
Numcodecs version
0.11.0
Python Version
3.10.12
Operating System
Linux
Installation
colab.research.google.com
Description
The problem is that mode='w' ends up calling init_group or init_array with overwrite=True, which tries to delete all pre-existing items in the store:
https://github.com/zarr-developers/zarr-python/blob/4132f360616a4c8bfa3dd4e979a4793c5d84cdfc/zarr/storage.py#L670-L675
These calls to rmdir can fail, because storage APIs for removing a directory (at least with local storage) use shutil.rmtree() without ignore_errors=True.
Parallel execution frameworks like Apache Beam may execute code multiple times in parallel. Ideally, there would be a way we could guarantee this is safe with Zarr-Python, and we get the desired result as long as at least one task finishes successfully. In the Beam model, the framework will start executing duplicates copies of a task if it is taking a long time to complete, and will then cancel the extra copies after the first task finishes.
Writing individual metadata files is atomic (at least in most storage backends), so my suggestion would be to achieve this by allowing for some way not to delete existing files in a storage directory. Maybe mode='w+' would be a good way to indicate "create (ignore other data in the store if it exists)"?
Steps to reproduce
Example using multiple threads:
import time
import zarr
from concurrent import futures
def create_array():
zarr.open_array('/tmp/my-array.zarr', mode='w', shape=(), dtype=np.float64)
executor = futures.ThreadPoolExecutor(max_workers=100)
results = [executor.submit(create_array) for _ in range(100)]
for future in results:
future.result()
This typically fails with something like FileNotFoundError: [Errno 2] No such file or directory: '.zarray':
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
<ipython-input-24-f747bf813b53> in <cell line: 10>()
9 results = [executor.submit(create_array) for _ in range(100)]
10 for future in results:
---> 11 future.result()
11 frames
/usr/lib/python3.10/concurrent/futures/_base.py in result(self, timeout)
449 raise CancelledError()
450 elif self._state == FINISHED:
--> 451 return self.__get_result()
452
453 self._condition.wait(timeout)
/usr/lib/python3.10/concurrent/futures/_base.py in __get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
/usr/lib/python3.10/concurrent/futures/thread.py in run(self)
56
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
<ipython-input-24-f747bf813b53> in create_array()
4
5 def create_array():
----> 6 zarr.open_array('/tmp/my-array.zarr', mode='w', shape=(), dtype=np.float64)
7
8 executor = futures.ThreadPoolExecutor(max_workers=100)
/usr/local/lib/python3.10/dist-packages/zarr/creation.py in open_array(store, mode, shape, chunks, dtype, compressor, fill_value, order, synchronizer, filters, cache_metadata, cache_attrs, path, object_codec, chunk_store, storage_options, partial_decompress, write_empty_chunks, zarr_version, dimension_separator, **kwargs)
574
575 elif mode == 'w':
--> 576 init_array(store, shape=shape, chunks=chunks, dtype=dtype,
577 compressor=compressor, fill_value=fill_value,
578 order=order, filters=filters, overwrite=True, path=path,
/usr/local/lib/python3.10/dist-packages/zarr/storage.py in init_array(store, shape, chunks, dtype, compressor, fill_value, order, overwrite, path, chunk_store, filters, object_codec, dimension_separator, storage_transformers)
435 # compatibility with legacy tests using compressor=[]
436 compressor = None
--> 437 _init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
438 compressor=compressor, fill_value=fill_value,
439 order=order, overwrite=overwrite, path=path,
/usr/local/lib/python3.10/dist-packages/zarr/storage.py in _init_array_metadata(store, shape, chunks, dtype, compressor, fill_value, order, overwrite, path, chunk_store, filters, object_codec, dimension_separator, storage_transformers)
469 if store_version == 2:
470 # attempt to delete any pre-existing array in store
--> 471 rmdir(store, path)
472 if chunk_store is not None:
473 rmdir(chunk_store, path)
/usr/local/lib/python3.10/dist-packages/zarr/storage.py in rmdir(store, path)
190 if hasattr(store, "rmdir") and store.is_erasable(): # type: ignore
191 # pass through
--> 192 store.rmdir(path) # type: ignore
193 else:
194 # slow version, delete one key at a time
/usr/local/lib/python3.10/dist-packages/zarr/storage.py in rmdir(self, path)
1231 dir_path = os.path.join(dir_path, store_path)
1232 if os.path.isdir(dir_path):
-> 1233 shutil.rmtree(dir_path)
1234
1235 def getsize(self, path=None):
/usr/lib/python3.10/shutil.py in rmtree(path, ignore_errors, onerror)
723 try:
724 if os.path.samestat(orig_st, os.fstat(fd)):
--> 725 _rmtree_safe_fd(fd, path, onerror)
726 try:
727 os.close(fd)
/usr/lib/python3.10/shutil.py in _rmtree_safe_fd(topfd, path, onerror)
679 os.unlink(entry.name, dir_fd=topfd)
680 except OSError:
--> 681 onerror(os.unlink, fullname, sys.exc_info())
682
683 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
/usr/lib/python3.10/shutil.py in _rmtree_safe_fd(topfd, path, onerror)
677 else:
678 try:
--> 679 os.unlink(entry.name, dir_fd=topfd)
680 except OSError:
681 onerror(os.unlink, fullname, sys.exc_info())
FileNotFoundError: [Errno 2] No such file or directory: '.zarray'
Additional output
No response
cc: @d-v-b who was working on a more holistic form of hierarchy creation (though I don't know if it is likely to make the existing methods thread-safe)
cc: @d-v-b who was working on a more holistic form of hierarchy creation (though I don't know if it is likely to make the existing methods thread-safe)
To be clear, the stuff i'm doing over in pydantic-zarr is just for representing hierarchies, in memory or in JSON. I'm not touching the routines for creating them.
@shoyer I'm guessing mode=a does not work for your use case, because you want to create a brand new array, potentially with a new shape, dtype, compressor, etc? If this is what you want, then there is a complication: old chunks may not be readable under the new array parameters, and there will be exceptions when reading / writing if an old chunk is hit. But we could handle that behavior as well, provided we are deliberate in our choice of exceptions, and if we are happy adding something like ignore_mangled_chunks: bool = True to the Array constructor.
Correct, I want a brand new array, and don't want to do any error checking or cleanup of the existing store.
I am not concerned about mangled chunks from an old Zarr schema, both because I'm typically writing to a new location and because later stages of my job will write the full contents of array if/when they succeed.
I am not concerned about mangled chunks from an old Zarr schema, both because I'm typically writing to a new location and because later stages of my job will write the full contents of array if/when they succeed.
I understand that for your use case, old chunks won't be an issue, but I think if we add w+ we should have a smooth path for people who use it on stores with existing chunks. If we don't, then people could create corrupted arrays that zarr cannot read and write, and they would be forced to clean these up manually. Thoughts?
alternatively, we could stipulate that to using w+ on a store with chunks is an error until we figure out how it should work
alternatively, we could stipulate that to using
w+on a store with chunks is an error until we figure out how it should work
My preference would be that using w+ on an array store with chunks offers no guarantees of correctness, similar to how concurrent writes to overlapping chunks of a Zarr array results does not guarantee correctness.
The use cases are pretty similar. We need a low level method that highly efficient and concurrency safe, even at the cost of error checking, so it can be used by distributed applications (which can and should do their own error checking).
Ideally, my example function above could be extended to any arbitrary use of Zarr's API to create a group of arrays, with optional metadata and any number of array values filled in (this is what happens when you fill in a Zarr template with Xarray by calling .to_zarr(..., compute=False)). It should be safe to run duplicate copies of the same group/array creation/setting code, and as long as at least one task finishes successfully, we should get the same result.
Basically, similar to how we how filling a complete chunk of an array works, we should support a way for creating groups/arrays where each individual operation is idempotent and may be repeated any number of times.
My preference would be that using w+ on an array store with chunks offers no guarantees of correctness, similar to how concurrent writes to overlapping chunks of a Zarr array results does not guarantee correctness.
I think my concern is that, with the way things currently work in zarr, using w+ on an existing array has the potential to break the array API in ways that will be confusing to users. To illustrate:
# create an array at foo
x = create_array(store, path='foo', mode='w', shape=(10,), chunks=(2,))
# initialize all chunks to 1
x[:] = 1
# access foo with w+ mode, changing the array metadata
y = create_array(store, path='foo', mode='w+', shape=(10,), chunks=(3,))
# all __getitem__ calls will break, because the chunks were written when the array had chunks=(2,),
# but now the chunks are (3,) according to array metadata
_ = y[:2]
# this __setitem__ call will work, because the slice matches the current chunking exactly
y[:3] = 2
# this __setitem__ call will fail, because the slice does not match the current chunking,
# which triggers a __getitem__ call, which will hit an outdated chunk
y[3] = 2
# this _should_ work, and result in all chunks having the correct shape afterwards
y[:] = 2
# this will work now
y[3] = 2
Am I missing something here and / or do other people think this kind of behavior is fine?
Basically, similar to how we how filling a complete chunk of an array works, we should support a way for creating groups/arrays where each individual operation is idempotent and may be repeated any number of times.
I totally agree. I will keep thinking about this to see if there's a way around my concerns.
I think "w+" may not be the best name for this option, since "w+" as an fopen option means to truncate the existing data.
In tensorstore we have an option called assume_metadata where it will open the array assuming whatever metadata was specified as part of the open call, without reading or writing any metadata on disk. I think with zarr-python you could already accomplish that by specifying a separate in-memory store for the metadata only.
I think "w+" may not be the best name for this option, since "w+" as an fopen option means to truncate the existing data.
It seems that the difference between w and w+ in fopen is whether a file is open only for writing, or for writing & reading. So indeed, using w+ to denote "don't validate" would almost be backwards from the fopen meaning, where it means "read as well as write".
OK, so here are two alternative API ideas:
- Add a new keyword only argument like
validate_store=Falsecould be used inopen_array/open_groupto indicate skipping validation checks at array/group creation time? It would only be valid tovalidate_store=Falseifmode='w'. - Pick a new value of
modeto indicate "write without validation". Maybemode='w_unsafe'? This would let us reuse themodeargument, and avoid the need to handle invalid combinations ofmodeandvalidate_store=False.
Any preferences? I think I would be happy with either.
This is coming up repeatedly for Xarray-Beam users, so I think I'm going to try to tackle this.
Thinking about this a little more, I think it would make sense to use mode='w_unclean' to indicate "unclean" writes.
This appears to be fixed in Zarr v3, or at least my original reproducer no longer triggers an error.
Thanks for checking! I've tagged this as v2, and we can keep this open in case someone wants to try and fix it on our v2 support branch.
The basic problem you observed might persist -- I think it's still the case in v3 that creating an array with overwrite=True will trigger an attempt to delete everything under the prefix for that array prior to writing metadata.
Given that the operation you want to achieve is basically "ensure that some array metadata exists this path", we should expose a routine that makes this easy. create_array and similar routines take overwrite: bool as a keyword argument. We could consider widening the type of this parameter to express more things, e.g. overwrite = {'chunks': False, 'metadata': True} to denote "leave existing chunks alone, but overwrite the metadata"
over in #2665, I've added a low-level function (create_nodes) that just creates array / group metadata, and doesn't do anything else. I.e., it's the responsibility of the caller to ensure that a pre-existing array is cleaned up. this function or one similar to it might be useful for ensuring that node creation and old node cleanup are logically separated.