filesystem_spec
filesystem_spec copied to clipboard
Missing kwargs in ZipFileSystem leading to `botocore.exceptions.NoCredentialsError: Unable to locate credentials`
Hello,
Summary
I initially created an issue on the xarray repository: https://github.com/pydata/xarray/issues/8944 ; I place it here for reference as it provides more context.
The part concerning fsspec is the following: when trying to access a Zip file on a s3 bucket requiring authentication, the kwargs are not passed in an open method called by ZipFileSystem.__init__, leading to an authentication error. The problematic line is: https://github.com/fsspec/filesystem_spec/blob/37c1bc63b9c5a5b2b9a0d5161e89b4233f888b29/fsspec/implementations/zip.py#L57
Version: fsspec: 2023.10.0 (also true for current version mentioned above): kwargs are not passed to the open method in .
Suggested bugfix
Current:
fo = fsspec.open(
fo, mode=mode + "b", protocol=target_protocol, **(target_options or {})
)
Proposed bugfix: (passing the kwargs)
fo = fsspec.open(
fo, mode=mode + "b", protocol=target_protocol, **(target_options or {}), **kwargs
)
When testing locally, adding the kwargs leads to a successful opening of the Zip file. I don't have an Minimal Complete Verifiable unfortunately since it requires a Zip file on an s3 bucket with authentication.
Do you think this would be a good solution, or maybe something more suble might be needed?
Thanks!
Can you show how you were calling xarray that led to this problem, please?
Hello,
Thanks for your reply! You can find below a code example and the produced stacktrace. I cannot unfortunately provide a "MCVE" (Minimal Complete Verifiable Example) as the following snippet requires access to a s3 bucket with authentication.
Code example
import xarray as xr
credentials_key = "key"
credentials_secret = "secret"
credentials_endpoint_url = "endpoint_url"
credentials_region_name = "region"
storage_options = dict(
key=credentials_key,
secret=credentials_secret,
client_kwargs=dict(
endpoint_url=credentials_endpoint_url,
region_name=credentials_region_name,
),
)
zip_s3_zarr_path = "zip::s3://path/to/my/dataset.zarr.zip"
xds = xr.open_dataset(
zip_s3_zarr_path,
backend_kwargs={"storage_options": storage_options},
engine="zarr",
group="/",
consolidated=True,
)
Stack trace
NoCredentialsError Traceback (most recent call last)
Cell In[4], line 1
----> 1 xds = xr.open_dataset(
2 zip_s3_zarr_path,
3 backend_kwargs={"storage_options": storage_options},
4 engine="zarr",
5 group="/",
6 consolidated=True,
7 )
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/api.py:573, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
561 decoders = _resolve_decoders_kwargs(
562 decode_cf,
563 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
569 decode_coords=decode_coords,
570 )
572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 573 backend_ds = backend.open_dataset(
574 filename_or_obj,
575 drop_variables=drop_variables,
576 **decoders,
577 **kwargs,
578 )
579 ds = _dataset_from_backend_dataset(
580 backend_ds,
581 filename_or_obj,
(...)
591 **kwargs,
592 )
593 return ds
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/zarr.py:967, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, zarr_version)
946 def open_dataset( # type: ignore[override] # allow LSP violation, not supporting **kwargs
947 self,
948 filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
(...)
964 zarr_version=None,
965 ) -> Dataset:
966 filename_or_obj = _normalize_path(filename_or_obj)
--> 967 store = ZarrStore.open_group(
968 filename_or_obj,
969 group=group,
970 mode=mode,
971 synchronizer=synchronizer,
972 consolidated=consolidated,
973 consolidate_on_close=False,
974 chunk_store=chunk_store,
975 storage_options=storage_options,
976 stacklevel=stacklevel + 1,
977 zarr_version=zarr_version,
978 )
980 store_entrypoint = StoreBackendEntrypoint()
981 with close_on_error(store):
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/xarray/backends/zarr.py:454, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version, write_empty)
451 raise FileNotFoundError(f"No such file or directory: '{store}'")
452 elif consolidated:
453 # TODO: an option to pass the metadata_key keyword
--> 454 zarr_group = zarr.open_consolidated(store, **open_kwargs)
455 else:
456 zarr_group = zarr.open_group(store, **open_kwargs)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/convenience.py:1334, in open_consolidated(store, metadata_key, mode, **kwargs)
1332 # normalize parameters
1333 zarr_version = kwargs.get("zarr_version")
-> 1334 store = normalize_store_arg(
1335 store, storage_options=kwargs.get("storage_options"), mode=mode, zarr_version=zarr_version
1336 )
1337 if mode not in {"r", "r+"}:
1338 raise ValueError("invalid mode, expected either 'r' or 'r+'; found {!r}".format(mode))
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:197, in normalize_store_arg(store, storage_options, mode, zarr_version)
195 else:
196 raise ValueError("zarr_version must be either 2 or 3")
--> 197 return normalize_store(store, storage_options, mode)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:167, in _normalize_store_arg_v2(store, storage_options, mode)
165 if isinstance(store, str):
166 if "://" in store or "::" in store:
--> 167 return FSStore(store, mode=mode, **(storage_options or {}))
168 elif storage_options:
169 raise ValueError("storage_options passed with non-fsspec path")
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/zarr/storage.py:1377, in FSStore.__init__(self, url, normalize_keys, key_separator, mode, exceptions, dimension_separator, fs, check, create, missing_exceptions, **storage_options)
1375 if protocol in (None, "file") and not storage_options.get("auto_mkdir"):
1376 storage_options["auto_mkdir"] = True
-> 1377 self.map = fsspec.get_mapper(url, **{**mapper_options, **storage_options})
1378 self.fs = self.map.fs # for direct operations
1379 self.path = self.fs._strip_protocol(url)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/mapping.py:245, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
214 """Create key-value interface for given URL and options
215
216 The URL will be of the form "protocol://location" and point to the root
(...)
242 ``FSMap`` instance, the dict-like key-value store.
243 """
244 # Removing protocol here - could defer to each open() on the backend
--> 245 fs, urlpath = url_to_fs(url, **kwargs)
246 root = alternate_root if alternate_root is not None else urlpath
247 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/core.py:388, in url_to_fs(url, **kwargs)
386 inkwargs["fo"] = urls
387 urlpath, protocol, _ = chain[0]
--> 388 fs = filesystem(protocol, **inkwargs)
389 return fs, urlpath
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/registry.py:290, in filesystem(protocol, **storage_options)
283 warnings.warn(
284 "The 'arrow_hdfs' protocol has been deprecated and will be "
285 "removed in the future. Specify it as 'hdfs'.",
286 DeprecationWarning,
287 )
289 cls = get_filesystem_class(protocol)
--> 290 return cls(**storage_options)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:79, in _Cached.__call__(cls, *args, **kwargs)
77 return cls._cache[token]
78 else:
---> 79 obj = super().__call__(*args, **kwargs)
80 # Setting _fs_token here causes some static linters to complain.
81 obj._fs_token_ = token
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/implementations/zip.py:56, in ZipFileSystem.__init__(self, fo, mode, target_protocol, target_options, compression, allowZip64, compresslevel, **kwargs)
52 fo = fsspec.open(
53 fo, mode=mode + "b", protocol=target_protocol, **(target_options or {}), # **kwargs
54 )
55 self.of = fo
---> 56 self.fo = fo.__enter__() # the whole instance is a context
57 self.zip = zipfile.ZipFile(
58 self.fo,
59 mode=mode,
(...)
62 compresslevel=compresslevel,
63 )
64 self.dir_cache = None
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/core.py:100, in OpenFile.__enter__(self)
97 def __enter__(self):
98 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 100 f = self.fs.open(self.path, mode=mode)
102 self.fobjects = [f]
104 if self.compression is not None:
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1307, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
1305 else:
1306 ac = kwargs.pop("autocommit", not self._intrans)
-> 1307 f = self._open(
1308 path,
1309 mode=mode,
1310 block_size=block_size,
1311 autocommit=ac,
1312 cache_options=cache_options,
1313 **kwargs,
1314 )
1315 if compression is not None:
1316 from fsspec.compression import compr
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:671, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, size, requester_pays, cache_options, **kwargs)
668 if cache_type is None:
669 cache_type = self.default_cache_type
--> 671 return S3File(
672 self,
673 path,
674 mode,
675 block_size=block_size,
676 acl=acl,
677 version_id=version_id,
678 fill_cache=fill_cache,
679 s3_additional_kwargs=kw,
680 cache_type=cache_type,
681 autocommit=autocommit,
682 requester_pays=requester_pays,
683 cache_options=cache_options,
684 size=size,
685 )
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:2099, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options, size)
2097 self.details = s3.info(path)
2098 self.version_id = self.details.get("VersionId")
-> 2099 super().__init__(
2100 s3,
2101 path,
2102 mode,
2103 block_size,
2104 autocommit=autocommit,
2105 cache_type=cache_type,
2106 cache_options=cache_options,
2107 size=size,
2108 )
2109 self.s3 = self.fs # compatibility
2111 # when not using autocommit we want to have transactional state to manage
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1663, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
1661 self.size = size
1662 else:
-> 1663 self.size = self.details["size"]
1664 self.cache = caches[cache_type](
1665 self.blocksize, self._fetch_range, self.size, **cache_options
1666 )
1667 else:
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/spec.py:1676, in AbstractBufferedFile.details(self)
1673 @property
1674 def details(self):
1675 if self._details is None:
-> 1676 self._details = self.fs.info(self.path)
1677 return self._details
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
--> 103 raise return_result
104 else:
105 return return_result
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:1302, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
1300 if key:
1301 try:
-> 1302 out = await self._call_s3(
1303 "head_object",
1304 self.kwargs,
1305 Bucket=bucket,
1306 Key=key,
1307 **version_id_kw(version_id),
1308 **self.req_kw,
1309 )
1310 return {
1311 "ETag": out.get("ETag", ""),
1312 "LastModified": out["LastModified"],
(...)
1318 "ContentType": out.get("ContentType"),
1319 }
1320 except FileNotFoundError:
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:348, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
346 logger.debug("CALL: %s - %s - %s", method.__name__, akwarglist, kw2)
347 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 348 return await _error_wrapper(
349 method, kwargs=additional_kwargs, retries=self.retries
350 )
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:140, in _error_wrapper(func, args, kwargs, retries)
138 err = e
139 err = translate_boto_error(err)
--> 140 raise err
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/s3fs/core.py:113, in _error_wrapper(func, args, kwargs, retries)
111 for i in range(retries):
112 try:
--> 113 return await func(*args, **kwargs)
114 except S3_RETRYABLE_ERRORS as e:
115 err = e
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/client.py:366, in AioBaseClient._make_api_call(self, operation_name, api_params)
362 maybe_compress_request(
363 self.meta.config, request_dict, operation_model
364 )
365 apply_request_checksum(request_dict)
--> 366 http, parsed_response = await self._make_request(
367 operation_model, request_dict, request_context
368 )
370 await self.meta.events.emit(
371 'after-call.{service_id}.{operation_name}'.format(
372 service_id=service_id, operation_name=operation_name
(...)
377 context=request_context,
378 )
380 if http.status_code >= 300:
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/client.py:391, in AioBaseClient._make_request(self, operation_model, request_dict, request_context)
387 async def _make_request(
388 self, operation_model, request_dict, request_context
389 ):
390 try:
--> 391 return await self._endpoint.make_request(
392 operation_model, request_dict
393 )
394 except Exception as e:
395 await self.meta.events.emit(
396 'after-call-error.{service_id}.{operation_name}'.format(
397 service_id=self._service_model.service_id.hyphenize(),
(...)
401 context=request_context,
402 )
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/endpoint.py:96, in AioEndpoint._send_request(self, request_dict, operation_model)
94 context = request_dict['context']
95 self._update_retries_context(context, attempts)
---> 96 request = await self.create_request(request_dict, operation_model)
97 success_response, exception = await self._get_response(
98 request, operation_model, context
99 )
100 while await self._needs_retry(
101 attempts,
102 operation_model,
(...)
105 exception,
106 ):
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/endpoint.py:84, in AioEndpoint.create_request(self, params, operation_model)
80 service_id = operation_model.service_model.service_id.hyphenize()
81 event_name = 'request-created.{service_id}.{op_name}'.format(
82 service_id=service_id, op_name=operation_model.name
83 )
---> 84 await self._event_emitter.emit(
85 event_name,
86 request=request,
87 operation_name=operation_model.name,
88 )
89 prepared_request = self.prepare_request(request)
90 return prepared_request
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/hooks.py:66, in AioHierarchicalEmitter._emit(self, event_name, kwargs, stop_on_response)
63 logger.debug('Event %s: calling handler %s', event_name, handler)
65 # Await the handler if its a coroutine.
---> 66 response = await resolve_awaitable(handler(**kwargs))
67 responses.append((handler, response))
68 if stop_on_response and response is not None:
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/_helpers.py:15, in resolve_awaitable(obj)
13 async def resolve_awaitable(obj):
14 if inspect.isawaitable(obj):
---> 15 return await obj
17 return obj
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/signers.py:24, in AioRequestSigner.handler(self, operation_name, request, **kwargs)
19 async def handler(self, operation_name=None, request=None, **kwargs):
20 # This is typically hooked up to the "request-created" event
21 # from a client's event emitter. When a new request is created
22 # this method is invoked to sign the request.
23 # Don't call this method directly.
---> 24 return await self.sign(operation_name, request)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/aiobotocore/signers.py:82, in AioRequestSigner.sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
79 else:
80 raise e
---> 82 auth.add_auth(request)
File ~/.pyenv/versions/3.11.6/envs/work-env/lib/python3.11/site-packages/botocore/auth.py:418, in SigV4Auth.add_auth(self, request)
416 def add_auth(self, request):
417 if self.credentials is None:
--> 418 raise NoCredentialsError()
419 datetime_now = datetime.datetime.utcnow()
420 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
NoCredentialsError: Unable to locate credentials
OK, so the problem is, that the URL contains two components, and fsspec needs to know which class to pass the arguments to. In the default case, which you are hitting here, there all go to the "outer" class, Zip.
The following should work:
storage_options = dict(
key=credentials_key,
secret=credentials_secret,
client_kwargs=dict(
endpoint_url=credentials_endpoint_url,
region_name=credentials_region_name,
),
)
zip_s3_zarr_path = "zip::s3://path/to/my/dataset.zarr.zip"
xds = xr.open_dataset(
zip_s3_zarr_path,
backend_kwargs={"storage_options": {"s3": storage_options}},
engine="zarr",
group="/",
consolidated=True,
)
(The possible nested depth of storage arguments is one of the reasons I think a cataloging library like Intake should be used for storing this kind of invocation, since it's so easy to get wrong)
Thanks a lot, it solves the issue!
Do you know if this nesting behaviour is mentioned in the documentation? I checked real quick for the fsspec.open method here is the kwargs description:
Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.
Maybe it could be worth it that for a "compound" URL (eg zip::s3://), each level of nesting in the compound, should be reflected by the nesting of the passed kwargs dict? Or maybe it is already mentioned somewhere else (I only has a quick look)
Some details are here: https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining
Perhaps it is not obvious to users to go that section.