zarr-python icon indicating copy to clipboard operation
zarr-python copied to clipboard

Improve async handling for s3fs

Open gbip opened this issue 9 months ago • 5 comments

Zarr version

v3.X

Numcodecs version

v0.15.1

Python Version

3.12

Operating System

Linux

Installation

pip into venv

Description

When opening a zarr using s3fs, some errors related to aiohttp objects are raised when leaving the Python interpreter. From what I understand, it seems that cleaning asynchronous objects should be performed by the filestorage implementation, as in https://github.com/zarr-developers/zarr-python/issues/2674#issuecomment-2581263695.

However it is unclear to me if this should be performed by zarr_python or the s3fs implementation, as discussed here : https://github.com/fsspec/s3fs/issues/943.

What do you think ?

Steps to reproduce

import xarray
import fsspec
import zarr

s3_endpoint = "http://localhost:19900"

fs = fsspec.filesystem(
    "s3",
    key="ACCESS",
    secret="S3CRET",
    endpoint_url=s3_endpoint,
    asynchronous=True,
)
store = zarr.storage.FsspecStore(fs, path="/mybucket/myzarr.zarr")
dataset = xarray.open_zarr(store, zarr_format=3)

Should raise the following error :

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fe2071f6c60>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7fe2071ca6f0>, 24766.009516954), (<aiohttp.client_proto.ResponseHandler object at 0x7fe2071cac30>, 24766.015895832)])']
connector: <aiohttp.connector.TCPConnector object at 0x7fe2071ab800>

Additional output

No response

gbip avatar Mar 10 '25 11:03 gbip

What happens if your application code closes the filesystem (does s3fs offer an async with context? I don't remember)?

async with fs:
    store = zarr.storage.FsspecStore(fs, path="/mybucket/myzarr.zarr")
    dataset = xarray.open_zarr(store, zarr_format=3)

The tricky bit here is that zarr-python doesn't really know who owns the filesystem passed into FsspecStore. We don't necessarily want to close it, since the user might be using it outside of zarr.

Maybe we could make FsspecStore (and the related stores) async context managers and have then close the store upon __aexit__?

TomAugspurger avatar Mar 10 '25 20:03 TomAugspurger

s3fs actually has a custom finalizer that should close these sessions in some cases

https://github.com/fsspec/s3fs/blob/01b9c4b838b81375093ae1d78562edf6bdc616ea/s3fs/core.py#L575-L578

Although it looks like that is not used for asynchronous filesystems.

rabernat avatar Mar 10 '25 21:03 rabernat

I'm guessing this is the same error I'm seeing, even though I'm using s3fs.S3FileSystem? E.g. -

import zarr
import s3fs

s3 = s3fs.S3FileSystem(profile="PROFILE", asynchronous=True)
store = zarr.storage.FsspecStore(s3, path=f"/mybucket/myzarr.zarr")
z = zarr.open(store, mode="r")

which produces:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x11bd89940>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x11bdd0ad0>, 30591.100072875), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd02f0>, 30591.103170083), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd0890>, 30591.104944541), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd1610>, 30591.295504583)])']
connector: <aiohttp.connector.TCPConnector object at 0x11bd89010>

christine-e-smit avatar Apr 23 '25 21:04 christine-e-smit

I'm guessing this is the same error I'm seeing, even though I'm using s3fs.S3FileSystem? E.g. -

import zarr import s3fs

s3 = s3fs.S3FileSystem(profile="PROFILE", asynchronous=True) store = zarr.storage.FsspecStore(s3, path=f"/mybucket/myzarr.zarr") z = zarr.open(store, mode="r")

which produces:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x11bd89940>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x11bdd0ad0>, 30591.100072875), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd02f0>, 30591.103170083), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd0890>, 30591.104944541), (<aiohttp.client_proto.ResponseHandler object at 0x11bdd1610>, 30591.295504583)])']
connector: <aiohttp.connector.TCPConnector object at 0x11bd89010>

Yep to me it's the same error.

gbip avatar Apr 24 '25 08:04 gbip

I have a similar issue, but mine only seems to pop up, when I use a role to authenticate, when my credentials are saved in ~/.aws/credentials I don't see the same error.

All I need to do to reproduce the errors is:

group = zarr.open_group("s3://path-to-zarr")

mark-boer avatar May 15 '25 15:05 mark-boer