asyncssh icon indicating copy to clipboard operation
asyncssh copied to clipboard

Allow defining owned or providing a factory in SFTPClient's “copy-like” methods

Open uwinx opened this issue 3 years ago • 5 comments

It'd be helpful if the library provided more flexibility on “dstfs” overriding for cases like copying file to s3 bucket.

Would you consider this as worth implementing? I can do that, but can't commit to it and/or promise.

uwinx avatar Jul 20 '22 07:07 uwinx

For the S3 use case in particular, I'm not sure this makes sense. According to https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html:

Amazon S3 never adds partial objects; if you receive a success response, Amazon S3 added the entire object to the bucket. You cannot use PutObject to only update a single piece of metadata for an existing object. You must put the entire object with updated metadata if you want to update some values.

Is there some other S3 API which provides random-access writes? Without that, _SFTPFileCopier can't function. It depends on being able to break down the copy into multiple smaller read/write operations, but to run those operations in parallel to greatly improve the transfer speed. If you are going to be copying a file to somewhere that only lets you update the whole file at once with sequential data, it would probably be much more efficient to just use the AsyncSSH "open" API in SFTPClient and build an S3 REST API request which you'd than add the file data to incrementally. You might still be able to get some of the speed improvements by initiating multiple reads in parallel, but you'd have to process them in order, which means buffering the data in memory. The SFTPClientFile class already knows how to break down a large read into smaller chunks and issue them in parallel but deliver the result sequentially, so there's no read to _SFTPFileCopier in this case.

If you have other use cases in mind, I'd probably need to understand those a bit better, to see if the current _SFTPFileCopier is really the right thing to start from here or not.

ronf avatar Jul 21 '22 01:07 ronf

@ronf, thanks for the heads-up. “Open” can indeed be preferred over copier in this case.

I think it'd still be helpful if the library provided sequential chunked read out of the box.

The SFTPClientFile class already knows how to break down a large read into smaller chunks and issue them in parallel but deliver the result sequentially, so there's no read to _SFTPFileCopier in this case.

The ability to do so is there in SFTPClientFile. However, the way I see it's really the _SFTPFileCopier, who implements the chunked operations and there's some kind of layer missing. For example, in aiohttp world there's AsyncStreamIterator, which can be used in asyncssh's case too. This is optional, however. :)

I'm sorry, I might be missing something. Anyway, thanks for such a well-written library.

P.S. For those, who are seeking an answer to the similar question, s3 has a “multipart” upload, which you might want to use.

uwinx avatar Jul 25 '22 08:07 uwinx

Looking at aiohttp, it looks like the reader mixin class provides a default async iterator which returns data a line at a time, similar to what is provided by the SSHReader in AsyncSSH (and StreamReader in asyncio). However, it also provides another function iter_chunked(self, n: int) -> AsyncStreamIterator[bytes] which returns sequential blocks of data in the form of an async iterator, where you can pass in the block size. Something similar is available in the aioftp package, but in the form of two functions, iter_by_line and iter_by_block, with the latter taking an optional block size.

It would be very easy to provide an equivalent to iter_by_block, but if I implemented in a similar manner to what I see in aioftp right now, it wouldn't provide the parallelism that's is offered in SFTPParallelIO, the shared parent class of SFTPFileReader, SFTPFileWriter, and SFTPFileCopier. To support the parallelism needed by those classes, I think I'd need to define an iterator that potentially returned blocks out of order, each marked with their offset.

Here's what a sequential version of a block iterator might look like as an extension to SSHReader:

    async def by_block(self, block_size: int) -> AsyncIterator[AnyStr]:
        """Return blocks of data sequentially as an async iterator"""

        while not self.at_eof():
            yield await self.read(block_size)

Then, instead of doing async for line in reader: to return data a line at a time, you could do something like async for block in reader.by_block(block_size) to return data a block at a time. I can see that syntax being slightly more convenient than a regular read loop.

This would be easy to define yourself outside of SSHReader. Just define your own function which replaces self above with reader. Similarly, a function like this could be defined on the SFTPClientFIle class.

Thanks for the pointer to S3 "multipart" uploads. That does look like it would provide a way to do something similar to random access writes needed by SFTPCopier, with a potential to have the "finish" step make the call to S3 to do the final object reassembly. Perhaps it would be possible to provide a random-access version of an async iterator (returning tuples of offset and data) that could be leveraged for a use case like this.

I'm doing some other cleanup in SFTP right now, but I'll see what this might look like once I finish these other changes.

ronf avatar Jul 25 '22 14:07 ronf

I did some experimenting with this today, and was able to use the existing _SFTPParallelIO class to build a new variant of the parallel read function. The existing read() will already break the request up into multiple parallel reads if the total size is large enough, but it waits until it has reassembled the entire result before returning anything. However, the new read_parallel() function returns the data via an async iterator a block at a time as each remote SFTP read call completes. The iterator returns an offset and the block of data read from that offset. The new method looks like this:

async def read_parallel(self, size: int = -1,
                        offset: Optional[int] = None) -> AsyncIterator[Tuple[int, bytes]]

Here's an example of what it looks like to use it:

async with sftp.open('file') as f:
    async for offset, data in await f.read_parallel():
        # Do something with data, which was read from the returned offset

For instance, below is the equivalent of what SFTPCopier would do, but you could also replace the AsyncSSH writes here with a write to something else, such as an S3 bucket:

async with sftp.open('src') as f1:
    async with sftp.open('dst', 'wb') as f2:
        async for offset, data in await f1.read_parallel():
            await f2.write(data, offset)

Like with the existing read(), you can specify block_size and max_requests on the open() call if you don't like the defaults for that (up to 128 parallel requests of 16 KB each). With no arguments, read_parallel() will return the entire file, but you can also specify an offset and size if you want only a part of the file.

With small block sizes, the reads will generally complete in order, but if you try to make the block size bigger than what the server supports, multiple reads may be issued out of order to make sure all the data in the requested range is returned.

Is this the sort of thing you were looking for?

ronf avatar Jul 31 '22 06:07 ronf

This change is now available in the "develop" branch as commit 422dce8.

For now, I haven't exposed a write_parallel() function as I'm not sure whether that would be useful or not. However, the SFTPFileWriter class has been changed to have the same kind of internal iter() method as SFTPFileReader, so this would be very easy to do. In that case, it would return tuples of offset & size of the individual writes as they completed.

ronf avatar Aug 03 '22 01:08 ronf

This change is now available in AsyncSSH 2.12.0.

ronf avatar Aug 11 '22 05:08 ronf