filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

race conditions in fsspec cache?

Open nbren12 opened this issue 4 years ago • 13 comments

I recently had some strange data corruption errors happen when using a cached fsspec filesystem from parallel processes. Seemed like a race condition. Is the cached file system thread/process safe?

According this this stackoverflow answer os.rename can ensure that parallel writes do not corrupt a single file.

nbren12 avatar May 14 '21 07:05 nbren12

The cached filesystem is largely untested from multiple processes. I would probably expect multi-thread to be OK, though. The current cache implementations might not work well, in general, for large loads. There is an idea elsewhere to try to decouple the logical layer (what gets cached, when) from the storage end (dealing with the disk or other backend). I could use help planning these.

Note that the cache metadata is saved using shutil.move, which indeed is the same as os.rename where the paths are on the same device. This should probably be used for the files themselves - which would perhaps best be accomplished by using LocalFileSystem.open(, autocommit=False) or transactions.

martindurant avatar May 14 '21 17:05 martindurant

The cached filesystem is largely untested from multiple processes.

Yah, I just started playing around with it. It seems like a potentially powerful feature, but as they say: "there are two hard problems in CS..."

This should probably be used for the files themselves - which would perhaps best be accomplished by using LocalFileSystem.open(, autocommit=False) or transactions.

Refactors aside, I think this would resolve this issue.

nbren12 avatar May 14 '21 19:05 nbren12

Essentially, the cachers call fs.get(remotepaths, localpaths) in just a couple of places, so it would be simple to use a temporary location until downloading has completed. I don't think there is any foolproof way to ensure that the temporary file is on the same device as the eventual target, except to put them in the same directory (e.g., "{localpath}.temp.UUID").

martindurant avatar May 14 '21 19:05 martindurant

except to put them in the same directory (e.g., "{localpath}.temp.UUID")

this sounds like a good idea. It would also probably not work reliably on NFS.

nbren12 avatar May 14 '21 19:05 nbren12

Was this ever implemented? I'm looking for a process-safe read cache using fsspec, but it's proving harder than anticipated. In my case, multiple Dask workers might simultaneously try to fetch the same file. It is a bit hard to debug (celery + dask + containers), but it seems like sometimes this causes an error when the 2nd worker tries to open a file that is half downloaded by the 1st.

It seems to be working with simplecache, but then there is no way to verify that the files are up to date, which is occassionally an issue

Danferno avatar Oct 23 '24 11:10 Danferno

It seems no one has picked up this work. @Danferno , are you in a position to contribute?

martindurant avatar Oct 23 '24 13:10 martindurant

I can try but I'm afraid this sort of core work is somewhat above my skill level (more on the consuming side of packages)

Danferno avatar Oct 23 '24 13:10 Danferno

If I understand the thread above, it means replacing the get(remote, local) with

get(remote, local.temp)
mv(local.temp local)

I may have time to do this myself, but I don't want to promise.

martindurant avatar Oct 23 '24 13:10 martindurant

Yes. POSIX filesystems have atomic rename operations. You will likely need to catch a FileExistsError.

nbren12 avatar Oct 23 '24 17:10 nbren12

If I understand correctly, this is the relevant part:

details = [self._check_file(sp) for sp in paths]
        downpath = [p for p, d in zip(paths, details) if not d]
        downfn0 = [
            os.path.join(self.storage[-1], self._mapper(p))
            for p, d in zip(paths, details)
        ]  # keep these path names for opening later
        downfn = [fn for fn, d in zip(downfn0, details) if not d]
        if downpath:
            # skip if all files are already cached and up to date
            self.fs.get(downpath, downfn)

            # update metadata - only happens when downloads are successful
            newdetail = [
                {
                    "original": path,
                    "fn": self._mapper(path),
                    "blocks": True,
                    "time": time.time(),
                    "uid": self.fs.ukey(path),
                }
                for path in downpath
            ]
            for path, detail in zip(downpath, newdetail):
                self._metadata.update_file(path, detail)
            self.save_cache()

The issue I think is not so much that fsspec tries to open an incomplete file, because the metadata is only written after fs.get() completes and it only loads files with metadata. It's rather a thundering herd kind of situation where each process will try to download the file, which then causes overwriting issues (although I don't know how this is handled at the os level).

I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?

Another option would be to include a check to see if the file is already being downloaded? IIRC, fsspec does not like using locks because of serialisation issues. An alternative is to check first if the file exists, if so, check if its filesize is growing (or try to get expected filesize?). If it is growing, stall, if not, start downloading? If this sounds interesting, I can write a first version of this (I did this before switching to fsspec for maintainability)

Rename option:

            from uuid import uuid4
            self.fs.get(downPath, temppath:=f'{downfn}.temp.{uuid4()}')
            try:
                        self.fs.rename(temppath, downfn)
            except FileExistsError:
                        pass
            ...

I'm not entirely sure if downfn is just a string path given the name?

Danferno avatar Oct 31 '24 09:10 Danferno

I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?

It would be nice, but in practice, this file will immediately be read next, so if it's incomplete, that would be a problem.

martindurant avatar Oct 31 '24 13:10 martindurant

I think the temporary name would indeed work. I think if the file exists, it's safer to pass rather than overwrite, as some other process may currently be reading the existing file?

It would be nice, but in practice, this file will immediately be read next, so if it's incomplete, that would be a problem.

I mean after it downloads to the tempname: if the real name (downfn) exists by then (i.e. downloaded by other process) then that file should be complete (because it only gets renamed to the real name upon completion)

Danferno avatar Oct 31 '24 13:10 Danferno

OK, that makes sense. I suppose rm and mv are about equivalent in that case, except you might have temporary duplication in the mv case if the target file is already open in some processes.

martindurant avatar Oct 31 '24 13:10 martindurant