filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

GenericFileSystem Buffered Copy

Open ryaminal opened this issue 1 year ago • 6 comments

Hi all. Love fsspec.

I'm trying to use GenericFileSystem like this:

import fsspec
import fsspec.generic

fs = fsspec.url_to_fs("sftp://username@host")[0]

fsspec.generic.rsync(
    "sftp:///stuff", # only the path necessary here. the username and host and stuff is discarded. just the protocol and path are used
    "gs://bucket/dir1/dir2",  # don't add the trailing slash, unless you want double slashes in your path!
    inst_kwargs={"default_method": "current"},
)

This will currently use the fsspec.generic.GenericFileSystem._copy method which creates a temp file on disk(by default) e.g. sftp -> local -> gs. This is undesirable for my use case.

Assumption 1

When looking at GenericFileSystem, there is a buffering implementation in fsspec.generic.GenericFileSystem._cp_file however, I don't think that method will ever be called because _copy has been implemented(unless it is added to _copy).

Force _cp_file to be used

If I remove _copy(rename to __copy) then _cp_file is in fact called, but there is a problem. The error is that open_async is not implemented by either of the filesystems in my example(sshfs.spec.SSHFileSystem and gcsfs.core.GCSFileSystem)

Assumption 2

This is very confusing to me because both SSHFileSystem and GCSFileSystem both extend fsspec.asyn.AsyncFileSystem but neither of them implement open_async. So, in GenericFileSystem._cp_file when the if hasattr(fs, "open_async") checks are done, they return true because they technically have that attribute/method, but it is not implemented.

Force sync open

If a sync open is forced in GenericFileSystem._cp_file then another error

NotImplementedError: Calling sync() from within a running loop

is found because we are in an async context but trying to call a sync method.

Question

What to do here?

  1. Implement my own rsync that doesn't try to be generic? (kind of already did this, heavily based on `GenericFileSystem
  2. figure out how to do non-async in an async context, for now, and incur the performance "penalty"
  3. implement open_async in both SSHFileSystem and GCSFileSystem

ryaminal avatar Apr 17 '24 03:04 ryaminal

Indeed, it has proven hard to implement open_async even for filesystems that are fundamentally async. Just because each operation is async doesn't necessarily mean you can leave a connection open in the background. Particularly, there seem to be no good ways to write async to a file (more below). The file caching process seemed like a good intermediate place, where we can stream from multiple files and write to multiple files in batches and not be limited by strict streaming.

Details: aiohttp allows for async writes using a pull pattern where you can provide an async generator. However, our situation is push because we are waiting on reads from another async stream, so the typical await f2.write(await f.read()) doesn't work. I don't know how to get around this.

martindurant avatar Apr 17 '24 14:04 martindurant

@martindurant , that context is great, thanks. at first glance it seems so trivial to implement open_async but then reality hits and it's a pretty tricky problem to try and implement "generically".

curous about the 3 options i listed above.

i think the "fastest" win(selfishly for me) would be to figure out how to get GenericFileSystem._cp_file to work with sync even though gcsfs and sshfs both implement AsyncFileSystem. but i'd love other's thoughts.

ryaminal avatar Apr 17 '24 14:04 ryaminal

Can't we just use https://pypi.org/project/aioshutil/ for the aio version of shutil.copyfileobj?

Skylion007 avatar Apr 17 '24 14:04 Skylion007

Oh RIP: https://github.com/kumaraditya303/aioshutil/issues/12#issue-1704396675

Skylion007 avatar Apr 17 '24 14:04 Skylion007

Not too bad to implement on your own apparently: https://github.com/Tinche/aiofiles/issues/61#issuecomment-1544583935

Skylion007 avatar Apr 17 '24 14:04 Skylion007

aioshutil would have had the same pull problem I described above. We don't know how to make a generic async write() method without at least repeating all the current buffering logic in the various async files. A true stream (open connection) appears to be not possible.

i think the "fastest" win(selfishly for me) would be to figure out how to get GenericFileSystem._cp_file to work with sync even though gcsfs and sshfs both implement AsyncFileSystem.

It would be OK maybe. We'd we restricted to working on one file at a time, which works for cp_file() but very not for copy().

martindurant avatar Apr 17 '24 15:04 martindurant