s3fs icon indicating copy to clipboard operation
s3fs copied to clipboard

Working example of using Async/Await

Open Sarafudinov opened this issue 1 year ago • 11 comments

Hello, I’m working with s3fs and haven’t found any examples of how to work with asynchrony; after looking through your checklog I saw that you’ve already added functionality. Could you please tell me how to use asynchronous methods for reading/writing and listing files the same question was asked in 2020 then this functionality was not ready and only a link to a small piece was provided https://s3fs.readthedocs.io/en/latest/#async

Thanks in advance for your attention!)

Sarafudinov avatar Apr 26 '24 11:04 Sarafudinov

What kind of workflow would you like? Perhaps I can sketch out what you need to do, and you help update the docs?

martindurant avatar Apr 26 '24 13:04 martindurant

Thank you, I would like to learn more, see examples of how to use the reading (s3.open), writing and listing (s3.ls) methods in asynchronous code

  1. I need the ability to asynchronously download files from a bucket (as far as I understand, this is the open_async method)
  2. I would also like to see an example of asynchronously writing files to a bucket
  3. and regarding the listing of files, I don’t quite understand whether there is an analogue when initializing s3 = S3FileSystem(asynchronous=True) since to work with the listing (s3.ls(path)) you have to use synchronous initialization s3 = S3FileSystem()

Sarafudinov avatar Apr 26 '24 14:04 Sarafudinov

all I need is an example of how to read files from the bucket, write files to the bucket and look at the listing of files on the bucket

at the same time, so that it is all asynchronous As far as I understand from your changelog, you have already implemented this, but I haven’t found any examples

2023.4.0 Add streaming async read file (#722)

or for now link only to the botocore library https://aiobotocore.readthedocs.io/en/latest/examples/s3/basic_usage.html

Sarafudinov avatar Apr 30 '24 13:04 Sarafudinov

Sorry to be slow, thanks for the ping. Here is an example:

import asyncio

async def f():
    import fsspec
    mybucket = "mybucket"   # <- change this
    fs = fsspec.filesystem("s3", asynchronous=True, anon=False)
    print(await fs._ls(mybucket))
    await fs._pipe_file(f"{mybucket}/afile", b"hello world")
    print(await fs._cat_file(f"{mybucket}/afile"))

asyncio.run(f())

You should note that the file-like interface (via fs.open(...)) is not async, because the upstream python file-like object (io.IOBase) is not async. This is why open_async exists, but this "streaming" API is incomplete.

martindurant avatar Apr 30 '24 14:04 martindurant

do not quite understand😅 As far as I understand, asynchronous reading is designed through s3fs.open_async() in your example you indicate something like this:

async def async_read():
    s3 = S3FileSystem(..., asynchronous=True)
    fs = await s3.open_async(path, "rb")
    result = await fs.read()

this is what this asynchronous reading looks like + - I figured it out after digging through your git😁

is there any example when writing is used?

async def async_write(some_content):
    s3 = S3FileSystem(..., asynchronous=True)
    fs = await s3.open_async(path, "w")
    await fs.write(some_content)

since the only thing I found was also an issue https://github.com/fsspec/s3fs/issues/853

Sarafudinov avatar Apr 30 '24 14:04 Sarafudinov

No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.

martindurant avatar Apr 30 '24 14:04 martindurant

Hello, I tried to add the logic of asynchronous writing, I hope this will somehow help in development! https://github.com/fsspec/s3fs/pull/885

Sarafudinov avatar Jun 17 '24 12:06 Sarafudinov

@martindurant

No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.

I recently encountered this too and spent a long time figuring out what the problem was. There was just an error that the file was already closed. It turns out that the write method is just not implemented in the class S3AsyncStreamedFile. A method of the base class is called in which the closed flag turns out to be True.

You could just override write method and raise NotImplementedError exception, it would save time because the documentation doesn't say a word about all this.

espdev avatar Dec 06 '24 09:12 espdev

I think the following is the right way to do it. The API for async files for writing shouldn't change, but this will ensure an early not-implemented error:

--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -1092,7 +1092,7 @@ class AbstractAsyncStreamedFile(AbstractBufferedFile):
         raise NotImplementedError

     async def _initiate_upload(self):
-        pass
+        raise NotImplementedError

martindurant avatar Dec 12 '24 16:12 martindurant

What is the right way to use open_async (specifically for "wb")? Will it ever be a context manager?

wild-endeavor avatar Dec 12 '24 16:12 wild-endeavor

Would highly appreciate if anyone added a working example for Jupyterlab. RN this fails with RuntimeError: no running event loop:

async def foo():
    fs = s3fs.S3FileSystem(profile=profile, asynchronous=True)

    async with await fs.open(filepath, mode="rb") as f:
        return pd.read_csv(f)


result = await foo()
# This doesn't help either

import asyncio
import nest_asyncio

nest_asyncio.apply()

fs = s3fs.S3FileSystem(profile=profile, asynchronous=True, loop=asyncio.get_event_loop())

All while simple async calls have been supported in Jupyter for quite a while now. There's something with the internal library workings, but I don't have enough time to dig into it.

n-splv avatar Dec 19 '24 08:12 n-splv