filesystem_spec
filesystem_spec copied to clipboard
[Feature Request] Fork Safety for async filesystems
Hi there! Right now, async filesystems are explicitly incompatible with fork, presumably because fsspec tries to make sure that all the asyncio operations actually happen on a separate thread. See https://github.com/fsspec/filesystem_spec/blob/9ea19ba47dcee5c6a0c2435c9a776e8b67d7f7ef/fsspec/asyn.py#L305-L307
I was wondering whether there was some way that we could add limited support for forked processes. I understand that fork + threading gets into some tricky technical weeds, but supporting some level of fork would make for some integration with other projects easier.
In particular, I am trying to use an S3-backed zarr array with PyTorch (in order to train neural networks on some very large arrays) -- torch.utils.data.DataLoader does os.fork by default on Linux (see https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading), and the s3fs is an async file system, making this an incompatible combination.
I have been able to hack around this with the following code:
fs = ... # S3 file system, but could be an other async filesystem
try:
do_something(fs)
except RuntimeError as err:
if "fork-safe" not in str(err):
raise
import asyncio
from fsspec import asyn
if fs._loop is not asyn.loop[0]:
raise
# fs._loop is the default loop which is running on a thread on the parent process
# So let's spin up a new thread for this process.
new_loop = None
if len(asyn.loop) == 1:
new_loop = asyncio.new_event_loop()
asyn.loop.append(new_loop) # Appends are thread-safe!
if asyn.loop[1] is new_loop: # We inserted first
th = threading.Thread(target=new_loop.run_forever, name="fsspecIO")
th.daemon = True
th.start()
asyn.iothread.append(th)
fs._loop = asyn.loop[1]
fs._pid = os.getpid()
but obviously this is reaching fairly deep into the internals of fsspec, which makes me uncomfortable.
FWIW, I'm totally agnostic about how this should be implemented. The simplest thing I can think of is to add two functions: a reset_loop(new_lock: threading.Lock) function to reset the global event loop and a AsyncFilesystem.reset_loop function to reset the event loop of an existing filesystem would be fine -- then I can run the reset_loop() function in the worker_init_fn of the forked process and just call fs.reset_loop() when I catch the fork-safe error.
I'm afraid I simply don't know how to make things fork-safe in the presence of threads and potentially active coroutines and/or sockets.
If you have a foolproof way, that would be fine! From my work with the likes of Dask, fork does not seems to be typical anymore, since workers are long-lived, and we are fast at passing around serialised filesystem instances.
I'm afraid I simply don't know how to make things fork-safe in the presence of threads and potentially active coroutines and/or sockets.
Yeah, I agree this seems tricky. How about something simpler: would you accept a PR that resets the global asyncio thread? Something like (in asyn.py):
def reset_for_fork():
nonlocal lock
lock = threading.Lock()
if loop[0] is not None:
loop[0] = None
iothread[0] = None
This would strand all active coroutines, but at least would allow new filesystems created in the forked process to work correctly (as long as this function is called during the initialization). I'm happy to add the documentation stressing that this should only be called if you know there are no in-progress coroutines (since this leaves them stranded).
From my work with the likes of Dask, fork does not seems to be typical anymore
That's fair -- I only need this because fork is the default for PyTorch's data loading (which also lets you specify a worker_init_fn where I can call reset_for_fork() without worrying about any concurrency issues).
That function might be useful, but it is not enough: fsspec caches filesystem instances. So making a new instance that matches the arguments of a previous one will return the original instance, with whatever loop, client attributes it had, referring to dead event loops. If you clear the caches (fs_class.clear_instance_cache()), you will trigger the instances' cleanup methods via weakref.finalize, which may also call on the dead event loop.