s3fs
s3fs copied to clipboard
Working example of using Async/Await
Hello, I’m working with s3fs and haven’t found any examples of how to work with asynchrony; after looking through your checklog I saw that you’ve already added functionality. Could you please tell me how to use asynchronous methods for reading/writing and listing files the same question was asked in 2020 then this functionality was not ready and only a link to a small piece was provided https://s3fs.readthedocs.io/en/latest/#async
Thanks in advance for your attention!)
What kind of workflow would you like? Perhaps I can sketch out what you need to do, and you help update the docs?
Thank you, I would like to learn more, see examples of how to use the reading (s3.open), writing and listing (s3.ls) methods in asynchronous code
- I need the ability to asynchronously download files from a bucket (as far as I understand, this is the open_async method)
- I would also like to see an example of asynchronously writing files to a bucket
- and regarding the listing of files, I don’t quite understand whether there is an analogue when initializing s3 = S3FileSystem(asynchronous=True) since to work with the listing (s3.ls(path)) you have to use synchronous initialization s3 = S3FileSystem()
all I need is an example of how to read files from the bucket, write files to the bucket and look at the listing of files on the bucket
at the same time, so that it is all asynchronous As far as I understand from your changelog, you have already implemented this, but I haven’t found any examples
2023.4.0 Add streaming async read file (#722)
or for now link only to the botocore library https://aiobotocore.readthedocs.io/en/latest/examples/s3/basic_usage.html
Sorry to be slow, thanks for the ping. Here is an example:
import asyncio
async def f():
import fsspec
mybucket = "mybucket" # <- change this
fs = fsspec.filesystem("s3", asynchronous=True, anon=False)
print(await fs._ls(mybucket))
await fs._pipe_file(f"{mybucket}/afile", b"hello world")
print(await fs._cat_file(f"{mybucket}/afile"))
asyncio.run(f())
You should note that the file-like interface (via fs.open(...)) is not async, because the upstream python file-like object (io.IOBase) is not async. This is why open_async exists, but this "streaming" API is incomplete.
do not quite understand😅 As far as I understand, asynchronous reading is designed through s3fs.open_async() in your example you indicate something like this:
async def async_read():
s3 = S3FileSystem(..., asynchronous=True)
fs = await s3.open_async(path, "rb")
result = await fs.read()
this is what this asynchronous reading looks like + - I figured it out after digging through your git😁
is there any example when writing is used?
async def async_write(some_content):
s3 = S3FileSystem(..., asynchronous=True)
fs = await s3.open_async(path, "w")
await fs.write(some_content)
since the only thing I found was also an issue https://github.com/fsspec/s3fs/issues/853
No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.
Hello, I tried to add the logic of asynchronous writing, I hope this will somehow help in development! https://github.com/fsspec/s3fs/pull/885
@martindurant
No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.
I recently encountered this too and spent a long time figuring out what the problem was. There was just an error that the file was already closed. It turns out that the write method is just not implemented in the class S3AsyncStreamedFile. A method of the base class is called in which the closed flag turns out to be True.
You could just override write method and raise NotImplementedError exception, it would save time because the documentation doesn't say a word about all this.
I think the following is the right way to do it. The API for async files for writing shouldn't change, but this will ensure an early not-implemented error:
--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -1092,7 +1092,7 @@ class AbstractAsyncStreamedFile(AbstractBufferedFile):
raise NotImplementedError
async def _initiate_upload(self):
- pass
+ raise NotImplementedError
What is the right way to use open_async (specifically for "wb")? Will it ever be a context manager?
Would highly appreciate if anyone added a working example for Jupyterlab. RN this fails with RuntimeError: no running event loop:
async def foo():
fs = s3fs.S3FileSystem(profile=profile, asynchronous=True)
async with await fs.open(filepath, mode="rb") as f:
return pd.read_csv(f)
result = await foo()
# This doesn't help either
import asyncio
import nest_asyncio
nest_asyncio.apply()
fs = s3fs.S3FileSystem(profile=profile, asynchronous=True, loop=asyncio.get_event_loop())
All while simple async calls have been supported in Jupyter for quite a while now. There's something with the internal library workings, but I don't have enough time to dig into it.