race conditions in fsspec cache?
I recently had some strange data corruption errors happen when using a cached fsspec filesystem from parallel processes. Seemed like a race condition. Is the cached file system thread/process safe?
According this this stackoverflow answer os.rename can ensure that parallel writes do not corrupt a single file.
The cached filesystem is largely untested from multiple processes. I would probably expect multi-thread to be OK, though. The current cache implementations might not work well, in general, for large loads. There is an idea elsewhere to try to decouple the logical layer (what gets cached, when) from the storage end (dealing with the disk or other backend). I could use help planning these.
Note that the cache metadata is saved using shutil.move, which indeed is the same as os.rename where the paths are on the same device. This should probably be used for the files themselves - which would perhaps best be accomplished by using LocalFileSystem.open(, autocommit=False) or transactions.
The cached filesystem is largely untested from multiple processes.
Yah, I just started playing around with it. It seems like a potentially powerful feature, but as they say: "there are two hard problems in CS..."
This should probably be used for the files themselves - which would perhaps best be accomplished by using LocalFileSystem.open(, autocommit=False) or transactions.
Refactors aside, I think this would resolve this issue.
Essentially, the cachers call fs.get(remotepaths, localpaths) in just a couple of places, so it would be simple to use a temporary location until downloading has completed. I don't think there is any foolproof way to ensure that the temporary file is on the same device as the eventual target, except to put them in the same directory (e.g., "{localpath}.temp.UUID").
except to put them in the same directory (e.g., "{localpath}.temp.UUID")
this sounds like a good idea. It would also probably not work reliably on NFS.
Was this ever implemented? I'm looking for a process-safe read cache using fsspec, but it's proving harder than anticipated. In my case, multiple Dask workers might simultaneously try to fetch the same file. It is a bit hard to debug (celery + dask + containers), but it seems like sometimes this causes an error when the 2nd worker tries to open a file that is half downloaded by the 1st.
It seems to be working with simplecache, but then there is no way to verify that the files are up to date, which is occassionally an issue
It seems no one has picked up this work. @Danferno , are you in a position to contribute?
I can try but I'm afraid this sort of core work is somewhat above my skill level (more on the consuming side of packages)
If I understand the thread above, it means replacing the get(remote, local) with
get(remote, local.temp)
mv(local.temp local)
I may have time to do this myself, but I don't want to promise.
Yes. POSIX filesystems have atomic rename operations. You will likely need to catch a FileExistsError.
If I understand correctly, this is the relevant part:
details = [self._check_file(sp) for sp in paths]
downpath = [p for p, d in zip(paths, details) if not d]
downfn0 = [
os.path.join(self.storage[-1], self._mapper(p))
for p, d in zip(paths, details)
] # keep these path names for opening later
downfn = [fn for fn, d in zip(downfn0, details) if not d]
if downpath:
# skip if all files are already cached and up to date
self.fs.get(downpath, downfn)
# update metadata - only happens when downloads are successful
newdetail = [
{
"original": path,
"fn": self._mapper(path),
"blocks": True,
"time": time.time(),
"uid": self.fs.ukey(path),
}
for path in downpath
]
for path, detail in zip(downpath, newdetail):
self._metadata.update_file(path, detail)
self.save_cache()
The issue I think is not so much that fsspec tries to open an incomplete file, because the metadata is only written after fs.get() completes and it only loads files with metadata. It's rather a thundering herd kind of situation where each process will try to download the file, which then causes overwriting issues (although I don't know how this is handled at the os level).
I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?
Another option would be to include a check to see if the file is already being downloaded? IIRC, fsspec does not like using locks because of serialisation issues. An alternative is to check first if the file exists, if so, check if its filesize is growing (or try to get expected filesize?). If it is growing, stall, if not, start downloading? If this sounds interesting, I can write a first version of this (I did this before switching to fsspec for maintainability)
Rename option:
from uuid import uuid4
self.fs.get(downPath, temppath:=f'{downfn}.temp.{uuid4()}')
try:
self.fs.rename(temppath, downfn)
except FileExistsError:
pass
...
I'm not entirely sure if downfn is just a string path given the name?
I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?
It would be nice, but in practice, this file will immediately be read next, so if it's incomplete, that would be a problem.
I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?
It would be nice, but in practice, this file will immediately be read next, so if it's incomplete, that would be a problem.
I mean after it downloads to the tempname: if the real name (downfn) exists by then (i.e. downloaded by other process) then that file should be complete (because it only gets renamed to the real name upon completion)
OK, that makes sense. I suppose rm and mv are about equivalent in that case, except you might have temporary duplication in the mv case if the target file is already open in some processes.