aiofiles icon indicating copy to clipboard operation
aiofiles copied to clipboard

async versions of shutil

Open graingert opened this issue 5 years ago • 12 comments

shutil.copyfile and shutil.copyfileobj

graingert avatar Mar 18 '19 10:03 graingert

I came here to find out if this is already implemented. What does it require?

thedrow avatar Apr 17 '20 06:04 thedrow

Does this work?

import asyncio
from functools import partial, wraps
import shutil


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)

    return run

copyfile = wrap(shutil.copyfile)
copyfileobj = wrap(shutil.copyfileobj)
async def main():
  await copyfile('a', 'b')

asyncio.run(main())

pwwang avatar Oct 17 '20 04:10 pwwang

https://docs.python.org/3.9/library/asyncio-task.html#asyncio.to_thread

Of course there's

await asyncio.to_thread(shutil.copyfile, "a", "b")

graingert avatar Oct 17 '20 07:10 graingert

@graingert Right, but this would spawn a new thread for each call, right? (So if you are copying a lot of small files it would be inefficient)

@pwwang From what I understand, if you ran this hundreds of times would it only create some fixed number of threads in the threadpool the executor creates? If so, this sounds like the correct solution.

MatthewScholefield avatar Feb 04 '21 09:02 MatthewScholefield

asyncio.to_thread uses the default executor which is a bounded pool of worker threads by default

graingert avatar Feb 04 '21 09:02 graingert

If you are using a version of python earlier than 3.9 (which I was) you can use the aiofiles.os.wrap, the implementation is identical to what @pwwang mention in their comment. Otherwise I would agree with using asyncio.to_thread as @graingert suggested.

import shutil

from aiofiles.os import wrap

copyfile = wrap(shutil.copyfile)
copyfileobj = wrap(shutil.copyfileobj)

then they can be used as coroutines

await copyfile(src, dst)

xyloguy avatar Mar 09 '21 16:03 xyloguy

I don't think the implementations above (based on loop.run_in_executor() and asyncio.to_thread()) will promptly handle ^C interruptions.

For example, suppose you accidentally shutil.copyfile or shutil.rmtree the wrong path. You'd expect to be able to interrupt it midway through with ^C. But the shutil function is running in its own worker thread, which your main thread has no way of cancelling. If you spam ^C multiple times, you can probably get the process to exit faster, but the stack trace will show an inelegant interruption of asyncio internals, and I don't think resource cleanup will be orderly.

This is a problem for any function that you run in loop.run_in_executor()/asyncio.to_thread(), but it might be especially surprising here because we usually expect async I/O to be cancellable.

SyntaxColoring avatar Oct 18 '22 19:10 SyntaxColoring

I think you're correct, but that's an inherent limitation of the approach we're using. Any suggestions?

Tinche avatar Oct 19 '22 10:10 Tinche

It seems like you'd need a rewrite of the shutil functions designed to support explicit cancellation, using a cancel token or similar flag that's checked every time it copies a chunk or iterates to a new file

graingert avatar Oct 19 '22 10:10 graingert

already exists here I believe https://pypi.org/project/aioshutil/

fgoudreault avatar Jan 19 '23 01:01 fgoudreault

https://pypi.org/project/aioshutil/

aioshutil v1.3 - the latest version at the time of writing - implements most functions, including copyfileobj and copyfile, using loop.run_in_executor() meaning it still just runs the original shutil functions inside a thread pool rather than providing a true async implementation.

davidfstr avatar May 11 '23 19:05 davidfstr

Here's my own async implementation of shutil.copyfileobj():

_DEFAULT_CHUNK_SIZE = 32768  # bytes; arbitrary

async def aioshutil_copyfileobj(async_fsrc, async_fdst, *, chunksize: int=_DEFAULT_CHUNK_SIZE) -> None:
    while (chunk := await async_fsrc.read(chunksize)) != b'':
        await async_fdst.write(chunk)

davidfstr avatar May 11 '23 19:05 davidfstr