filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

fs.put() behaves inconsistently depending on whether the rpath already exists

Open tgaddair opened this issue 3 years ago • 8 comments

When calling fs.put() to upload a local directory to a remote filesystem, if the remote directory does not exist then only the local directory contents will be uploaded. However, if the remote directory does exist already, then the entire directory will be uploaded to the rpath. This makes certain sync patterns (like periodically calling put() to sync local files to the remote) difficult to implement.

Repro:

import os
import tempfile
from pathlib import Path
import fsspec

fs = fsspec.filesystem("file")

# at the beginning, /tmp/test does not exist
rpath = "/tmp/test"
with tempfile.TemporaryDirectory() as tmpdir:
    Path(os.path.join(tmpdir, "foo.txt")).touch()

   # in this call the file is uploaded as /tmp/test/foo.txt
    fs.put(tmpdir, rpath, recursive=True)

    # in this call the file is uploaded as /tmp/test/tmp045wchhz/foo.txt
    fs.put(tmpdir, rpath, recursive=True)

I've verified this behavior also happens when using s3fs. I assume it is true for all filesystem backends.

The expected behavior is that the file in the above should be written as /tmp/test/foo.txt for both calls. This is, for example, what happens when we wrap the fsspec filesystem with pyarrow:

pyarrow.fs.copy_files(
    tmpdir, rpath, destination_filesystem=pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(fs))
)

Using fsspec==2022.8.2, but also happened in 2022.7.1.

tgaddair avatar Oct 05 '22 02:10 tgaddair

The behaviour you describe as suggested is not that provided by typical CLI shells, like posix cp -r: if a directory exists, the whole source tree is put inside it, but if the target directory doesn't exist, the root directory is copied directly to the target path. This may depend on whether the terminating "/" is entered as part of the target path.

martindurant avatar Oct 06 '22 14:10 martindurant

I guess it's a separate question as to what should be done when requesting a recursive copy on a single file...

martindurant avatar Oct 06 '22 14:10 martindurant

Thanks for the response! I did try with the trailing /, but that didn't change the result.

I think even if the goal is be consistent with cp, there could be a way to achieve a conventional "sync" operation with fsspec. Using cp, I can make the behavior consistent using, for example:

mkdir test1
touch test1/foo.txt
cp -r test1/ test2/
# test2/
#    foo.txt

touch test1/bar.txt
cp -r test1/ test2/
# test2/
#    foo.txt
#    bar.txt

But from my tests it didn't look like this worked with put().

I don't think a solution needs to mirror cp like this, but ideally I would like there to be some simple API for syncing files in this way, even if it's a separate method from put().

tgaddair avatar Oct 07 '22 04:10 tgaddair

Perhaps you wanted put_file() ?

martindurant avatar Oct 07 '22 13:10 martindurant

In any case, you are quite right that we should at the very least be really explicit in the docs, so no one is surprised. But in general, we need to write out a set of tests of expected behaviour (single file, recursive on/off, remote dir exists or not) and tested against multiple remote filesystems with differing ideas about directories.

martindurant avatar Oct 13 '22 14:10 martindurant

There are a number of other issues related to this: #820, #882, s3fs https://github.com/fsspec/s3fs/issues/659. Here are the results of my investigation.

This code compares what happens for command-line cp, rsync and scp, and fsspec cp and put, with and without a trailing slash, running each twice to see if the output is the same or not, to copy a source directory containing a file into a target directory that doesn't initially exist:

import fsspec
import os
import shutil
import subprocess

def setup():
    source = '/tmp/source'
    target = '/tmp/target'

    # Clear test directories.
    shutil.rmtree(source, ignore_errors=True)
    shutil.rmtree(target, ignore_errors=True)

    # Create test directory and file.
    fs.mkdir(source)
    with open(os.path.join(source, 'a'), 'w') as f:
        f.write("Here is some text")

    # Target directory does not yet exist.
    assert not fs.isdir(target)

    return source, target

def run(msg, callback):
    source, target = setup()

    print(msg)
    for loop in range(2):
        callback(source, target)
        print(f"  loop {loop}: {fs.find(target, recursive=True, withdirs=True)}")

fs = fsspec.filesystem('file')

run("cp", lambda source, target: subprocess.run(['cp', '-r', source, target]))
run("cp, slash", lambda source, target: subprocess.run(['cp', '-r', source+'/', target]))

run("rsync", lambda source, target: subprocess.run(['rsync', '-r', source, target]))
run("rsync, slash", lambda source, target: subprocess.run(['rsync', '-r', source+'/', target]))

run("scp", lambda source, target: subprocess.run(['scp', '-r', source, target]))
run("scp, slash", lambda source, target: subprocess.run(['scp', '-r', source+'/', target]))

run("fs.cp", lambda source, target: fs.cp(source, target, recursive=True)) 
run("fs.cp, slash", lambda source, target: fs.cp(source+'/', target, recursive=True))

run("fs.put", lambda source, target: fs.put(source, target, recursive=True))
run("fs.put, slash", lambda source, target: fs.put(source+'/', target, recursive=True))

Output is (using fsspec commit 88e5f29f on macOS):

cp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
cp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
rsync
  loop 0: ['/tmp/target/source', '/tmp/target/source/a']
  loop 1: ['/tmp/target/source', '/tmp/target/source/a']
rsync, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
scp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
scp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.cp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.cp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.put
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
fs.put, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']

Ignoring rsync which is uniquely special, both command-line cp and scp are consistent in that they are repeatable if a trailing slash is used and if not then the behaviour depends on if the target directory exists.

fsspec cp and put have the same behaviour with and without a trailing slash, and the two functions behave differently to each other. For this LocalFileSystem I believe they should produce identical results.

I propose that the way forward here is to restate the assumption that fsspec.cp will do the same as command-line cp, and that LocalFileSystem.put will also do the same, and then fix the code to make it so. We'll need to fix the local case first before moving on to the remote ones. The above example can be the start of the comprehensive cp/put tests required for this.

The wider issue of users wanting some sort of sync functionality can be provided by the rsync functionality proposed in issue #1074 which I have started to think about. Then users who come across differing behaviour depending on a target directory existing or not can be directed to that rsync functionality.

ianthomas23 avatar Dec 06 '22 14:12 ianthomas23

Thanks for looking into it.

I agree with everything here. Local put() is a little special, though, since it probably doesn't get used much. We should probably run at least against "memory", which is similar in many ways to the cloud backends. Also, should include get() in this, which has much of the same logic again.

martindurant avatar Dec 06 '22 17:12 martindurant

For completeness here is the demo code with MemoryFileSystem and get() included:

import fsspec
import os
import subprocess

def setup(remote_source, remote_target):
    # Source
    if remote_source:
        source = 'remote_source'
        afs = mem
    else:
        source = '/tmp/source'
        afs = fs

    # Clear directory.
    if afs.exists(source):
        afs.rm(source, recursive=True)
    assert not afs.isdir(source)

    afs.mkdir(source)
    with afs.open(os.path.join(source, 'a'), 'w') as f:
        f.write("Here is some text")

    # Target
    if remote_target:
        target = 'remote_target'
        afs = mem
    else:
        target = '/tmp/target'
        afs = fs

    # Clear directory.
    if afs.exists(target):
        afs.rm(target, recursive=True)

    # Confirm target directory does not yet exist.
    assert not afs.isdir(target)

    return source, target

def run(msg, callback, remote_source=False, remote_target=False):
    source, target = setup(remote_source=remote_source, remote_target=remote_target)

    afs = mem if remote_target else fs
    print(msg)
    for loop in range(2):
        callback(source, target)
        print(f"  loop {loop}: {afs.find(target, recursive=True, withdirs=True)}")

fs = fsspec.filesystem('file')
mem = fsspec.filesystem('memory')

run("cp", lambda source, target: subprocess.run(['cp', '-r', source, target]))
run("cp, slash", lambda source, target: subprocess.run(['cp', '-r', source+'/', target]))

run("rsync", lambda source, target: subprocess.run(['rsync', '-r', source, target]))
run("rsync, slash", lambda source, target: subprocess.run(['rsync', '-r', source+'/', target]))

run("scp", lambda source, target: subprocess.run(['scp', '-r', source, target]))
run("scp, slash", lambda source, target: subprocess.run(['scp', '-r', source+'/', target]))

run("fs.cp", lambda source, target: fs.cp(source, target, recursive=True))
run("fs.cp, slash", lambda source, target: fs.cp(source+'/', target, recursive=True))

run("fs.get", lambda source, target: fs.get(source, target, recursive=True))
run("fs.get, slash", lambda source, target: fs.get(source+'/', target, recursive=True))

run("fs.put", lambda source, target: fs.put(source, target, recursive=True))
run("fs.put, slash", lambda source, target: fs.put(source+'/', target, recursive=True))

run("mem.cp", lambda source, target: mem.cp(source, target, recursive=True), remote_source=True, remote_target=True)
run("mem.cp, slash", lambda source, target: mem.cp(source+'/', target, recursive=True), remote_source=True, remote_target=True)

run("mem.get", lambda source, target: mem.get(source, target, recursive=True), remote_source=True)
run("mem.get, slash", lambda source, target: mem.get(source+'/', target, recursive=True), remote_source=True)

run("mem.put", lambda source, target: mem.put(source, target, recursive=True), remote_target=True)
run("mem.put, slash", lambda source, target: mem.put(source+'/', target, recursive=True), remote_target=True)

output:

cp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
cp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
rsync
  loop 0: ['/tmp/target/source', '/tmp/target/source/a']
  loop 1: ['/tmp/target/source', '/tmp/target/source/a']
rsync, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
scp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
scp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.cp
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.cp, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.get
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.get, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
fs.put
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
fs.put, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a', '/tmp/target/source', '/tmp/target/source/a']
mem.cp
  loop 0: ['/remote_target/a']
  loop 1: ['/remote_target/a']
mem.cp, slash
  loop 0: ['/remote_target/a']
  loop 1: ['/remote_target/a']
mem.get
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
mem.get, slash
  loop 0: ['/tmp/target/a']
  loop 1: ['/tmp/target/a']
mem.put
  loop 0: ['/remote_target/a']
  loop 1: ['/remote_target/a', '/remote_target/source', '/remote_target/source/a']
mem.put, slash
  loop 0: ['/remote_target/a']
  loop 1: ['/remote_target/a', '/remote_target/source', '/remote_target/source/a']

ianthomas23 avatar Dec 13 '22 11:12 ianthomas23

Thanks @ianthomas23 . More of the same, then! Mostly reasonable behaviour, meaning that we agree with cp/scp at least some of the time :)

martindurant avatar Dec 14 '22 16:12 martindurant

@tgaddair Returning to this, now that we have merged #1148 we have consistent behaviour for cp, get and put. The behaviour you observed in this issue still occurs, but it is by design now which is consistent with command-line cp. To get the behaviour you would like to see you should append a slash to your source directory, again consistent with command-line cp.

Using the latest commit (9932e212) of master branch and your (slightly modified) reproducer:

import os
import tempfile
from pathlib import Path
import subprocess
import fsspec

fs = fsspec.filesystem("file")

rpath = "/tmp/test"
if fs.exists(rpath):
    fs.rm(rpath, recursive=True)

with tempfile.TemporaryDirectory() as tmpdir:
    Path(os.path.join(tmpdir, "foo.txt")).touch()
    source = tmpdir
    #source = tmpdir + "/"

    for loop in range(2):
        fs.put(source, rpath, recursive=True)
        #subprocess.run(['cp', '-r', source, rpath])
        print(f"loop {loop}:", fs.find(rpath, recursive=True))

the output is

loop 0: ['/tmp/test/foo.txt']
loop 1: ['/tmp/test/foo.txt', '/tmp/test/tmpnah_v0po/foo.txt']

as you originally observed. Replacing source = tmpdir with source = tmpdir + "/" gives

loop 0: ['/tmp/test/foo.txt']
loop 1: ['/tmp/test/foo.txt']

If you comment out the fs.put(...) line and uncomment the subprocess one you will see the same behaviour using command-line cp. It is also the same using scp.

I am going to close this issue as I don't think any other action is necessary. Feel free to reopen it if there is more that you would like to discuss.

ianthomas23 avatar Feb 01 '23 12:02 ianthomas23