Improve async handling for s3fs
Zarr version
v3.X
Numcodecs version
v0.15.1
Python Version
3.12
Operating System
Linux
Installation
pip into venv
Description
When opening a zarr using s3fs, some errors related to aiohttp objects are raised when leaving the Python interpreter. From what I understand, it seems that cleaning asynchronous objects should be performed by the filestorage implementation, as in https://github.com/zarr-developers/zarr-python/issues/2674#issuecomment-2581263695.
However it is unclear to me if this should be performed by zarr_python or the s3fs implementation, as discussed here : https://github.com/fsspec/s3fs/issues/943.
What do you think ?
Steps to reproduce
import xarray
import fsspec
import zarr
s3_endpoint = "http://localhost:19900"
fs = fsspec.filesystem(
"s3",
key="ACCESS",
secret="S3CRET",
endpoint_url=s3_endpoint,
asynchronous=True,
)
store = zarr.storage.FsspecStore(fs, path="/mybucket/myzarr.zarr")
dataset = xarray.open_zarr(store, zarr_format=3)
Should raise the following error :
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fe2071f6c60>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7fe2071ca6f0>, 24766.009516954), (<aiohttp.client_proto.ResponseHandler object at 0x7fe2071cac30>, 24766.015895832)])']
connector: <aiohttp.connector.TCPConnector object at 0x7fe2071ab800>
Additional output
No response
What happens if your application code closes the filesystem (does s3fs offer an async with context? I don't remember)?
async with fs:
store = zarr.storage.FsspecStore(fs, path="/mybucket/myzarr.zarr")
dataset = xarray.open_zarr(store, zarr_format=3)
The tricky bit here is that zarr-python doesn't really know who owns the filesystem passed into FsspecStore. We don't necessarily want to close it, since the user might be using it outside of zarr.
Maybe we could make FsspecStore (and the related stores) async context managers and have then close the store upon __aexit__?
s3fs actually has a custom finalizer that should close these sessions in some cases
https://github.com/fsspec/s3fs/blob/01b9c4b838b81375093ae1d78562edf6bdc616ea/s3fs/core.py#L575-L578
Although it looks like that is not used for asynchronous filesystems.
I'm guessing this is the same error I'm seeing, even though I'm using s3fs.S3FileSystem? E.g. -
import zarr
import s3fs
s3 = s3fs.S3FileSystem(profile="PROFILE", asynchronous=True)
store = zarr.storage.FsspecStore(s3, path=f"/mybucket/myzarr.zarr")
z = zarr.open(store, mode="r")
which produces:
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x11bd89940>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x11bdd0ad0>, 30591.100072875), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd02f0>, 30591.103170083), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd0890>, 30591.104944541), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd1610>, 30591.295504583)])']
connector: <aiohttp.connector.TCPConnector object at 0x11bd89010>
I'm guessing this is the same error I'm seeing, even though I'm using
s3fs.S3FileSystem? E.g. -import zarr import s3fs
s3 = s3fs.S3FileSystem(profile="PROFILE", asynchronous=True) store = zarr.storage.FsspecStore(s3, path=f"/mybucket/myzarr.zarr") z = zarr.open(store, mode="r")
which produces:
Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x11bd89940> Unclosed connector connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x11bdd0ad0>, 30591.100072875), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd02f0>, 30591.103170083), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd0890>, 30591.104944541), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd1610>, 30591.295504583)])'] connector: <aiohttp.connector.TCPConnector object at 0x11bd89010>
Yep to me it's the same error.
I have a similar issue, but mine only seems to pop up, when I use a role to authenticate, when my credentials are saved in ~/.aws/credentials I don't see the same error.
All I need to do to reproduce the errors is:
group = zarr.open_group("s3://path-to-zarr")