filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

[Feature Request] Fork Safety for async filesystems

Open alanhdu opened this issue 3 years ago • 4 comments

Hi there! Right now, async filesystems are explicitly incompatible with fork, presumably because fsspec tries to make sure that all the asyncio operations actually happen on a separate thread. See https://github.com/fsspec/filesystem_spec/blob/9ea19ba47dcee5c6a0c2435c9a776e8b67d7f7ef/fsspec/asyn.py#L305-L307

I was wondering whether there was some way that we could add limited support for forked processes. I understand that fork + threading gets into some tricky technical weeds, but supporting some level of fork would make for some integration with other projects easier.

In particular, I am trying to use an S3-backed zarr array with PyTorch (in order to train neural networks on some very large arrays) -- torch.utils.data.DataLoader does os.fork by default on Linux (see https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading), and the s3fs is an async file system, making this an incompatible combination.

I have been able to hack around this with the following code:

fs = ...   # S3 file system, but could be an other async filesystem
try:
    do_something(fs)
except RuntimeError as err:
    if "fork-safe" not in str(err):
        raise
    import asyncio
    from fsspec import asyn

    if fs._loop is not asyn.loop[0]:
        raise
    # fs._loop is the default loop which is running on a thread on the parent process
    # So let's spin up a new thread for this process.
    new_loop = None
    if len(asyn.loop) == 1:
        new_loop = asyncio.new_event_loop()
        asyn.loop.append(new_loop)  # Appends are thread-safe!
    if asyn.loop[1] is new_loop:   # We inserted first
        th = threading.Thread(target=new_loop.run_forever, name="fsspecIO")
        th.daemon = True
        th.start()
        asyn.iothread.append(th)

    fs._loop = asyn.loop[1]
    fs._pid = os.getpid()

but obviously this is reaching fairly deep into the internals of fsspec, which makes me uncomfortable.

alanhdu avatar Nov 22 '21 21:11 alanhdu

FWIW, I'm totally agnostic about how this should be implemented. The simplest thing I can think of is to add two functions: a reset_loop(new_lock: threading.Lock) function to reset the global event loop and a AsyncFilesystem.reset_loop function to reset the event loop of an existing filesystem would be fine -- then I can run the reset_loop() function in the worker_init_fn of the forked process and just call fs.reset_loop() when I catch the fork-safe error.

alanhdu avatar Nov 22 '21 21:11 alanhdu

I'm afraid I simply don't know how to make things fork-safe in the presence of threads and potentially active coroutines and/or sockets.

If you have a foolproof way, that would be fine! From my work with the likes of Dask, fork does not seems to be typical anymore, since workers are long-lived, and we are fast at passing around serialised filesystem instances.

martindurant avatar Nov 22 '21 21:11 martindurant

I'm afraid I simply don't know how to make things fork-safe in the presence of threads and potentially active coroutines and/or sockets.

Yeah, I agree this seems tricky. How about something simpler: would you accept a PR that resets the global asyncio thread? Something like (in asyn.py):

def reset_for_fork():
    nonlocal lock
    lock = threading.Lock()
    if loop[0] is not None:
        loop[0] = None
        iothread[0] = None

This would strand all active coroutines, but at least would allow new filesystems created in the forked process to work correctly (as long as this function is called during the initialization). I'm happy to add the documentation stressing that this should only be called if you know there are no in-progress coroutines (since this leaves them stranded).

From my work with the likes of Dask, fork does not seems to be typical anymore

That's fair -- I only need this because fork is the default for PyTorch's data loading (which also lets you specify a worker_init_fn where I can call reset_for_fork() without worrying about any concurrency issues).

alanhdu avatar Nov 30 '21 02:11 alanhdu

That function might be useful, but it is not enough: fsspec caches filesystem instances. So making a new instance that matches the arguments of a previous one will return the original instance, with whatever loop, client attributes it had, referring to dead event loops. If you clear the caches (fs_class.clear_instance_cache()), you will trigger the instances' cleanup methods via weakref.finalize, which may also call on the dead event loop.

martindurant avatar Nov 30 '21 15:11 martindurant