filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

Missing kwargs in ZipFileSystem leading to `botocore.exceptions.NoCredentialsError: Unable to locate credentials`

Open eschalkargans opened this issue 1 year ago • 5 comments

Hello,

Summary

I initially created an issue on the xarray repository: https://github.com/pydata/xarray/issues/8944 ; I place it here for reference as it provides more context.

The part concerning fsspec is the following: when trying to access a Zip file on a s3 bucket requiring authentication, the kwargs are not passed in an open method called by ZipFileSystem.__init__, leading to an authentication error. The problematic line is: https://github.com/fsspec/filesystem_spec/blob/37c1bc63b9c5a5b2b9a0d5161e89b4233f888b29/fsspec/implementations/zip.py#L57

Version: fsspec: 2023.10.0 (also true for current version mentioned above): kwargs are not passed to the open method in .

Suggested bugfix

Current:

        fo = fsspec.open(
            fo, mode=mode + "b", protocol=target_protocol, **(target_options or {})
        )

Proposed bugfix: (passing the kwargs)

        fo = fsspec.open(
            fo, mode=mode + "b", protocol=target_protocol, **(target_options or {}), **kwargs
        )

When testing locally, adding the kwargs leads to a successful opening of the Zip file. I don't have an Minimal Complete Verifiable unfortunately since it requires a Zip file on an s3 bucket with authentication.

Do you think this would be a good solution, or maybe something more suble might be needed?

Thanks!

eschalkargans avatar Apr 15 '24 10:04 eschalkargans

Can you show how you were calling xarray that led to this problem, please?

martindurant avatar Apr 15 '24 12:04 martindurant

Hello,

Thanks for your reply! You can find below a code example and the produced stacktrace. I cannot unfortunately provide a "MCVE" (Minimal Complete Verifiable Example) as the following snippet requires access to a s3 bucket with authentication.

Code example

import xarray as xr


credentials_key = "key"
credentials_secret = "secret"
credentials_endpoint_url = "endpoint_url"
credentials_region_name = "region"

storage_options = dict(
    key=credentials_key,
    secret=credentials_secret,
    client_kwargs=dict(
        endpoint_url=credentials_endpoint_url,
        region_name=credentials_region_name,
    ),
)

zip_s3_zarr_path = "zip::s3://path/to/my/dataset.zarr.zip"

xds = xr.open_dataset(
    zip_s3_zarr_path,
    backend_kwargs={"storage_options": storage_options},
    engine="zarr",
    group="/",
    consolidated=True,
)

Stack trace


NoCredentialsError                        Traceback (most recent call last)

Cell In[4], line 1
----> 1 xds = xr.open_dataset(
      2     zip_s3_zarr_path,
      3     backend_kwargs={"storage_options": storage_options},
      4     engine="zarr",
      5     group="/",
      6     consolidated=True,
      7 )


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/api.py:573, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    561 decoders = _resolve_decoders_kwargs(
    562     decode_cf,
    563     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)
    569     decode_coords=decode_coords,
    570 )
    572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 573 backend_ds = backend.open_dataset(
    574     filename_or_obj,
    575     drop_variables=drop_variables,
    576     **decoders,
    577     **kwargs,
    578 )
    579 ds = _dataset_from_backend_dataset(
    580     backend_ds,
    581     filename_or_obj,
   (...)
    591     **kwargs,
    592 )
    593 return ds


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/zarr.py:967, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, zarr_version)
    946 def open_dataset(  # type: ignore[override]  # allow LSP violation, not supporting **kwargs
    947     self,
    948     filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
   (...)
    964     zarr_version=None,
    965 ) -> Dataset:
    966     filename_or_obj = _normalize_path(filename_or_obj)
--> 967     store = ZarrStore.open_group(
    968         filename_or_obj,
    969         group=group,
    970         mode=mode,
    971         synchronizer=synchronizer,
    972         consolidated=consolidated,
    973         consolidate_on_close=False,
    974         chunk_store=chunk_store,
    975         storage_options=storage_options,
    976         stacklevel=stacklevel + 1,
    977         zarr_version=zarr_version,
    978     )
    980     store_entrypoint = StoreBackendEntrypoint()
    981     with close_on_error(store):


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/zarr.py:454, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version, write_empty)
    451             raise FileNotFoundError(f"No such file or directory: '{store}'")
    452 elif consolidated:
    453     # TODO: an option to pass the metadata_key keyword
--> 454     zarr_group = zarr.open_consolidated(store, **open_kwargs)
    455 else:
    456     zarr_group = zarr.open_group(store, **open_kwargs)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/convenience.py:1334, in open_consolidated(store, metadata_key, mode, **kwargs)
   1332 # normalize parameters
   1333 zarr_version = kwargs.get("zarr_version")
-> 1334 store = normalize_store_arg(
   1335     store, storage_options=kwargs.get("storage_options"), mode=mode, zarr_version=zarr_version
   1336 )
   1337 if mode not in {"r", "r+"}:
   1338     raise ValueError("invalid mode, expected either 'r' or 'r+'; found {!r}".format(mode))


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:197, in normalize_store_arg(store, storage_options, mode, zarr_version)
    195 else:
    196     raise ValueError("zarr_version must be either 2 or 3")
--> 197 return normalize_store(store, storage_options, mode)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:167, in _normalize_store_arg_v2(store, storage_options, mode)
    165 if isinstance(store, str):
    166     if "://" in store or "::" in store:
--> 167         return FSStore(store, mode=mode, **(storage_options or {}))
    168     elif storage_options:
    169         raise ValueError("storage_options passed with non-fsspec path")


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:1377, in FSStore.__init__(self, url, normalize_keys, key_separator, mode, exceptions, dimension_separator, fs, check, create, missing_exceptions, **storage_options)
   1375 if protocol in (None, "file") and not storage_options.get("auto_mkdir"):
   1376     storage_options["auto_mkdir"] = True
-> 1377 self.map = fsspec.get_mapper(url, **{**mapper_options, **storage_options})
   1378 self.fs = self.map.fs  # for direct operations
   1379 self.path = self.fs._strip_protocol(url)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/mapping.py:245, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
    214 """Create key-value interface for given URL and options
    215
    216 The URL will be of the form "protocol://location" and point to the root
   (...)
    242 ``FSMap`` instance, the dict-like key-value store.
    243 """
    244 # Removing protocol here - could defer to each open() on the backend
--> 245 fs, urlpath = url_to_fs(url, **kwargs)
    246 root = alternate_root if alternate_root is not None else urlpath
    247 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/core.py:388, in url_to_fs(url, **kwargs)
    386     inkwargs["fo"] = urls
    387 urlpath, protocol, _ = chain[0]
--> 388 fs = filesystem(protocol, **inkwargs)
    389 return fs, urlpath


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/registry.py:290, in filesystem(protocol, **storage_options)
    283     warnings.warn(
    284         "The 'arrow_hdfs' protocol has been deprecated and will be "
    285         "removed in the future. Specify it as 'hdfs'.",
    286         DeprecationWarning,
    287     )
    289 cls = get_filesystem_class(protocol)
--> 290 return cls(**storage_options)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:79, in _Cached.__call__(cls, *args, **kwargs)
     77     return cls._cache[token]
     78 else:
---> 79     obj = super().__call__(*args, **kwargs)
     80     # Setting _fs_token here causes some static linters to complain.
     81     obj._fs_token_ = token


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/implementations/zip.py:56, in ZipFileSystem.__init__(self, fo, mode, target_protocol, target_options, compression, allowZip64, compresslevel, **kwargs)
     52     fo = fsspec.open(
     53         fo, mode=mode + "b", protocol=target_protocol, **(target_options or {}), # **kwargs
     54     )
     55 self.of = fo
---> 56 self.fo = fo.__enter__()  # the whole instance is a context
     57 self.zip = zipfile.ZipFile(
     58     self.fo,
     59     mode=mode,
   (...)
     62     compresslevel=compresslevel,
     63 )
     64 self.dir_cache = None


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/core.py:100, in OpenFile.__enter__(self)
     97 def __enter__(self):
     98     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 100     f = self.fs.open(self.path, mode=mode)
    102     self.fobjects = [f]
    104     if self.compression is not None:


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1307, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1305 else:
   1306     ac = kwargs.pop("autocommit", not self._intrans)
-> 1307     f = self._open(
   1308         path,
   1309         mode=mode,
   1310         block_size=block_size,
   1311         autocommit=ac,
   1312         cache_options=cache_options,
   1313         **kwargs,
   1314     )
   1315     if compression is not None:
   1316         from fsspec.compression import compr


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:671, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, size, requester_pays, cache_options, **kwargs)
    668 if cache_type is None:
    669     cache_type = self.default_cache_type
--> 671 return S3File(
    672     self,
    673     path,
    674     mode,
    675     block_size=block_size,
    676     acl=acl,
    677     version_id=version_id,
    678     fill_cache=fill_cache,
    679     s3_additional_kwargs=kw,
    680     cache_type=cache_type,
    681     autocommit=autocommit,
    682     requester_pays=requester_pays,
    683     cache_options=cache_options,
    684     size=size,
    685 )


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:2099, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options, size)
   2097         self.details = s3.info(path)
   2098         self.version_id = self.details.get("VersionId")
-> 2099 super().__init__(
   2100     s3,
   2101     path,
   2102     mode,
   2103     block_size,
   2104     autocommit=autocommit,
   2105     cache_type=cache_type,
   2106     cache_options=cache_options,
   2107     size=size,
   2108 )
   2109 self.s3 = self.fs  # compatibility
   2111 # when not using autocommit we want to have transactional state to manage


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1663, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
   1661         self.size = size
   1662     else:
-> 1663         self.size = self.details["size"]
   1664     self.cache = caches[cache_type](
   1665         self.blocksize, self._fetch_range, self.size, **cache_options
   1666     )
   1667 else:


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1676, in AbstractBufferedFile.details(self)
   1673 @property
   1674 def details(self):
   1675     if self._details is None:
-> 1676         self._details = self.fs.info(self.path)
   1677     return self._details


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:1302, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
   1300 if key:
   1301     try:
-> 1302         out = await self._call_s3(
   1303             "head_object",
   1304             self.kwargs,
   1305             Bucket=bucket,
   1306             Key=key,
   1307             **version_id_kw(version_id),
   1308             **self.req_kw,
   1309         )
   1310         return {
   1311             "ETag": out.get("ETag", ""),
   1312             "LastModified": out["LastModified"],
   (...)
   1318             "ContentType": out.get("ContentType"),
   1319         }
   1320     except FileNotFoundError:


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:348, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    346 logger.debug("CALL: %s - %s - %s", method.__name__, akwarglist, kw2)
    347 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 348 return await _error_wrapper(
    349     method, kwargs=additional_kwargs, retries=self.retries
    350 )


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:140, in _error_wrapper(func, args, kwargs, retries)
    138         err = e
    139 err = translate_boto_error(err)
--> 140 raise err


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:113, in _error_wrapper(func, args, kwargs, retries)
    111 for i in range(retries):
    112     try:
--> 113         return await func(*args, **kwargs)
    114     except S3_RETRYABLE_ERRORS as e:
    115         err = e


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/client.py:366, in AioBaseClient._make_api_call(self, operation_name, api_params)
    362     maybe_compress_request(
    363         self.meta.config, request_dict, operation_model
    364     )
    365     apply_request_checksum(request_dict)
--> 366     http, parsed_response = await self._make_request(
    367         operation_model, request_dict, request_context
    368     )
    370 await self.meta.events.emit(
    371     'after-call.{service_id}.{operation_name}'.format(
    372         service_id=service_id, operation_name=operation_name
   (...)
    377     context=request_context,
    378 )
    380 if http.status_code >= 300:


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/client.py:391, in AioBaseClient._make_request(self, operation_model, request_dict, request_context)
    387 async def _make_request(
    388     self, operation_model, request_dict, request_context
    389 ):
    390     try:
--> 391         return await self._endpoint.make_request(
    392             operation_model, request_dict
    393         )
    394     except Exception as e:
    395         await self.meta.events.emit(
    396             'after-call-error.{service_id}.{operation_name}'.format(
    397                 service_id=self._service_model.service_id.hyphenize(),
   (...)
    401             context=request_context,
    402         )


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/endpoint.py:96, in AioEndpoint._send_request(self, request_dict, operation_model)
     94 context = request_dict['context']
     95 self._update_retries_context(context, attempts)
---> 96 request = await self.create_request(request_dict, operation_model)
     97 success_response, exception = await self._get_response(
     98     request, operation_model, context
     99 )
    100 while await self._needs_retry(
    101     attempts,
    102     operation_model,
   (...)
    105     exception,
    106 ):


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/endpoint.py:84, in AioEndpoint.create_request(self, params, operation_model)
     80     service_id = operation_model.service_model.service_id.hyphenize()
     81     event_name = 'request-created.{service_id}.{op_name}'.format(
     82         service_id=service_id, op_name=operation_model.name
     83     )
---> 84     await self._event_emitter.emit(
     85         event_name,
     86         request=request,
     87         operation_name=operation_model.name,
     88     )
     89 prepared_request = self.prepare_request(request)
     90 return prepared_request


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/hooks.py:66, in AioHierarchicalEmitter._emit(self, event_name, kwargs, stop_on_response)
     63 logger.debug('Event %s: calling handler %s', event_name, handler)
     65 # Await the handler if its a coroutine.
---> 66 response = await resolve_awaitable(handler(**kwargs))
     67 responses.append((handler, response))
     68 if stop_on_response and response is not None:


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/_helpers.py:15, in resolve_awaitable(obj)
     13 async def resolve_awaitable(obj):
     14     if inspect.isawaitable(obj):
---> 15         return await obj
     17     return obj


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/signers.py:24, in AioRequestSigner.handler(self, operation_name, request, **kwargs)
     19 async def handler(self, operation_name=None, request=None, **kwargs):
     20     # This is typically hooked up to the "request-created" event
     21     # from a client's event emitter.  When a new request is created
     22     # this method is invoked to sign the request.
     23     # Don't call this method directly.
---> 24     return await self.sign(operation_name, request)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/signers.py:82, in AioRequestSigner.sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
     79     else:
     80         raise e
---> 82 auth.add_auth(request)


File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/botocore/auth.py:418, in SigV4Auth.add_auth(self, request)
    416 def add_auth(self, request):
    417     if self.credentials is None:
--> 418         raise NoCredentialsError()
    419     datetime_now = datetime.datetime.utcnow()
    420     request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)


NoCredentialsError: Unable to locate credentials

eschalkargans avatar Apr 15 '24 13:04 eschalkargans

OK, so the problem is, that the URL contains two components, and fsspec needs to know which class to pass the arguments to. In the default case, which you are hitting here, there all go to the "outer" class, Zip.

The following should work:

storage_options = dict(
    key=credentials_key,
    secret=credentials_secret,
    client_kwargs=dict(
        endpoint_url=credentials_endpoint_url,
        region_name=credentials_region_name,
    ),
)

zip_s3_zarr_path = "zip::s3://path/to/my/dataset.zarr.zip"

xds = xr.open_dataset(
    zip_s3_zarr_path,
    backend_kwargs={"storage_options": {"s3": storage_options}},
    engine="zarr",
    group="/",
    consolidated=True,
)

(The possible nested depth of storage arguments is one of the reasons I think a cataloging library like Intake should be used for storing this kind of invocation, since it's so easy to get wrong)

martindurant avatar Apr 15 '24 13:04 martindurant

Thanks a lot, it solves the issue!

Do you know if this nesting behaviour is mentioned in the documentation? I checked real quick for the fsspec.open method here is the kwargs description:

Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.

Maybe it could be worth it that for a "compound" URL (eg zip::s3://), each level of nesting in the compound, should be reflected by the nesting of the passed kwargs dict? Or maybe it is already mentioned somewhere else (I only has a quick look)

eschalkargans avatar Apr 15 '24 14:04 eschalkargans

Some details are here: https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining

Perhaps it is not obvious to users to go that section.

martindurant avatar Apr 15 '24 14:04 martindurant