cloudpathlib
cloudpathlib copied to clipboard
RequestPayer option for S3
Hi,
Thank you for the efforts on this package - I appreciate your work! I've got feature proposal because I stumbled about this today and dont think it is implemented.
AWS sometimes has the option of "Requester pays" e.g. on access of public datasets. You'll have to pass the credentials and additionally the keyword args RequestPayer='requester'
in boto3.
Documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
As far as I see this option is not included yet in the codebase of cloudpathlib. It should be integrated to the bucket.objects.filter() function of boto3, e.g.
bucket.objects.filter(Prefix=prefix, RequestPayer='requester')
What do you think?
Best regards, Johannes
Have you tested this? I haven't used it, but it looks like RequestPayer='requester'
is the default for the methods that support it:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html?highlight=requestpayer
I think that's the most common use case, so this will be low pri if it just works for requester pays buckets.
Hi @pjbull,
yes I tested the use case with the current cloudpathlib version. I tested the access to the public requester pays bucket (aws s3 ls s3://sentinel-s2-l1c/ --request-payer requester
) with cloudpathlib (and auth of course) returning a 403 Access forbidden message.
I'm not very deep in boto3's codebase. I only tested the read access to a public requester pays bucket. It seems that the RequestPayer='requester'
tag is not default for filter()
. Yet this seems the critical method for listing the contents of a bucket.
When I patched the cloudpathlib code with RequesterPayer='requester' on the filter()
parts of the code everthing worked fine and I was able to access the bucket.
Here is the content of s3client.py with the patch applied
import os
from pathlib import Path, PurePosixPath
from typing import Any, Dict, Iterable, Optional, Union
from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from .s3path import S3Path
try:
from boto3.session import Session
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from botocore.exceptions import ClientError
import botocore.session
except ModuleNotFoundError:
implementation_registry["s3"].dependencies_loaded = False
@register_client_class("s3")
class S3Client(Client):
"""Client class for AWS S3 which handles authentication with AWS for [`S3Path`](../s3path/)
instances. See documentation for the [`__init__` method][cloudpathlib.s3.s3client.S3Client.__init__]
for detailed authentication options."""
def __init__(
self,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
no_sign_request: Optional[bool] = False,
botocore_session: Optional["botocore.session.Session"] = None,
profile_name: Optional[str] = None,
boto3_session: Optional["Session"] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
endpoint_url: Optional[str] = None,
boto3_transfer_config: Optional["TransferConfig"] = None,
requester_pays: Optional[bool] = False
):
"""Class constructor. Sets up a boto3 [`Session`](
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html).
Directly supports the same authentication interface, as well as the same environment
variables supported by boto3. See [boto3 Session documentation](
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/session.html).
If no authentication arguments or environment variables are provided, then the client will
be instantiated as anonymous, which will only have access to public buckets.
Args:
aws_access_key_id (Optional[str]): AWS access key ID.
aws_secret_access_key (Optional[str]): AWS secret access key.
aws_session_token (Optional[str]): Session key for your AWS account. This is only
needed when you are using temporarycredentials.
no_sign_request: (Optional[bool]): If `True`, credentials are not looked for and we use unsigned
requests to fetch resources. This will only allow access to public resources. This is equivalent
to `--no-sign-request` in the AWS CLI (https://docs.aws.amazon.com/cli/latest/reference/).
botocore_session (Optional[botocore.session.Session]): An already instantiated botocore
Session.
profile_name (Optional[str]): Profile name of a profile in a shared credentials file.
boto3_session (Optional[Session]): An already instantiated boto3 Session.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory.
endpoint_url (Optional[str]): S3 server endpoint URL to use for the constructed boto3 S3 resource and client.
Parameterize it to access a customly deployed S3-compatible object store such as MinIO, Ceph or any other.
boto3_transfer_config (Optional[dict]): Instantiated TransferConfig for managing s3 transfers.
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig)
requester_pays (Optional[boolean]): If `True`, the authenticated requestor gets billed for the request. This is equivalent
to `--request-payer requester` in the AWS CLI (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html).
"""
if boto3_session is not None:
self.sess = boto3_session
else:
self.sess = Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
botocore_session=botocore_session,
profile_name=profile_name,
)
if no_sign_request:
self.s3 = self.sess.resource(
"s3",
endpoint_url=endpoint_url,
config=Config(signature_version=botocore.session.UNSIGNED),
)
self.client = self.sess.client(
"s3",
endpoint_url=endpoint_url,
config=Config(signature_version=botocore.session.UNSIGNED),
)
else:
self.s3 = self.sess.resource("s3", endpoint_url=endpoint_url)
self.client = self.sess.client("s3", endpoint_url=endpoint_url)
self.boto3_transfer_config = boto3_transfer_config
self.requester_pays = requester_pays
super().__init__(local_cache_dir=local_cache_dir)
def _get_metadata(self, cloud_path: S3Path) -> Dict[str, Any]:
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get()
return {
"last_modified": data["LastModified"],
"size": data["ContentLength"],
"etag": data["ETag"],
"mime": data["ContentType"],
"extra": data["Metadata"],
}
def _download_file(self, cloud_path: S3Path, local_path: Union[str, os.PathLike]) -> Path:
local_path = Path(local_path)
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)
obj.download_file(str(local_path), Config=self.boto3_transfer_config)
return local_path
def _is_file_or_dir(self, cloud_path: S3Path) -> Optional[str]:
# short-circuit the root-level bucket
if not cloud_path.key:
return "dir"
# get first item by listing at least one key
s3_obj = self._s3_file_query(cloud_path)
if s3_obj is None:
return None
# since S3 only returns files when filtering objects:
# if the first item key is equal to the path key, this is a file; else it is a dir
return "file" if s3_obj.key == cloud_path.key else "dir"
def _exists(self, cloud_path: S3Path) -> bool:
return self._s3_file_query(cloud_path) is not None
def _s3_file_query(self, cloud_path: S3Path):
"""Boto3 query used for quick checks of existence and if path is file/dir"""
# first check if this is an object that we can access directly
try:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)
obj.load()
return obj
# else, confirm it is a dir by filtering to the first item under the prefix
except ClientError:
return next(
(
obj
for obj in (
self.s3.Bucket(cloud_path.bucket)
.objects.filter(Prefix=cloud_path.key)
.limit(1)
)
),
None,
)
def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[S3Path]:
bucket = self.s3.Bucket(cloud_path.bucket)
prefix = cloud_path.key
if prefix and not prefix.endswith("/"):
prefix += "/"
yielded_dirs = set()
if recursive:
if self.requester_pays:
for o in bucket.objects.filter(Prefix=prefix, RequestPayer='requester'):
# get directory from this path
for parent in PurePosixPath(o.key[len(prefix) :]).parents:
# if we haven't surfaced their directory already
if parent not in yielded_dirs and str(parent) != ".":
yield self.CloudPath(f"s3://{cloud_path.bucket}/{prefix}{parent}")
yielded_dirs.add(parent)
yield self.CloudPath(f"s3://{o.bucket_name}/{o.key}")
else:
for o in bucket.objects.filter(Prefix=prefix):
# get directory from this path
for parent in PurePosixPath(o.key[len(prefix) :]).parents:
# if we haven't surfaced their directory already
if parent not in yielded_dirs and str(parent) != ".":
yield self.CloudPath(f"s3://{cloud_path.bucket}/{prefix}{parent}")
yielded_dirs.add(parent)
yield self.CloudPath(f"s3://{o.bucket_name}/{o.key}")
else:
# non recursive is best done with old client API rather than resource
paginator = self.client.get_paginator("list_objects")
if self.requester_pays:
for result in paginator.paginate(
Bucket=cloud_path.bucket, Prefix=prefix, Delimiter="/", RequestPayer='requester'
):
# sub directory names
for result_prefix in result.get("CommonPrefixes", []):
yield self.CloudPath(f"s3://{cloud_path.bucket}/{result_prefix.get('Prefix')}")
# files in the directory
for result_key in result.get("Contents", []):
yield self.CloudPath(f"s3://{cloud_path.bucket}/{result_key.get('Key')}")
else:
for result in paginator.paginate(
Bucket=cloud_path.bucket, Prefix=prefix, Delimiter="/"
):
# sub directory names
for result_prefix in result.get("CommonPrefixes", []):
yield self.CloudPath(f"s3://{cloud_path.bucket}/{result_prefix.get('Prefix')}")
# files in the directory
for result_key in result.get("Contents", []):
yield self.CloudPath(f"s3://{cloud_path.bucket}/{result_key.get('Key')}")
def _move_file(self, src: S3Path, dst: S3Path, remove_src: bool = True) -> S3Path:
# just a touch, so "REPLACE" metadata
if src == dst:
o = self.s3.Object(src.bucket, src.key)
o.copy_from(
CopySource={"Bucket": src.bucket, "Key": src.key},
Metadata=self._get_metadata(src).get("extra", {}),
MetadataDirective="REPLACE",
)
else:
target = self.s3.Object(dst.bucket, dst.key)
target.copy({"Bucket": src.bucket, "Key": src.key})
if remove_src:
self._remove(src)
return dst
def _remove(self, cloud_path: S3Path) -> None:
try:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)
# will throw if not a file
obj.load()
resp = obj.delete()
assert resp.get("ResponseMetadata").get("HTTPStatusCode") == 204
except ClientError:
# try to delete as a direcotry instead
bucket = self.s3.Bucket(cloud_path.bucket)
prefix = cloud_path.key
if prefix and not prefix.endswith("/"):
prefix += "/"
resp = bucket.objects.filter(Prefix=prefix).delete()
# ensure directory deleted; if cloud_path did not exist at all
# resp will be [], so no need to check success
if resp:
assert resp[0].get("ResponseMetadata").get("HTTPStatusCode") == 200
def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: S3Path) -> S3Path:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)
obj.upload_file(str(local_path), Config=self.boto3_transfer_config)
return cloud_path
S3Client.S3Path = S3Client.CloudPath # type: ignore
This is still an issue. See for yourself:
from cloudpathlib import CloudPath
tars = CloudPath("s3://arxiv/src/").glob("arXiv_src_20*_001.tar")
print(list(tars))
ClientError: An error occurred (AccessDenied) when calling the ListObjects operation: Access Denied
This is the public bucket for arXiv - you can download anything you want including document source when available. This should select the first tarball of each month of 2020.
The default for --request-payer requester
is always False. People need to explicitly state that they want to pay to access one of these buckets.
@grofte @gc-sommer We've added the ability to pass extra args to the client in #307.
See the updates to the documentation for how to pass the args to the client that will let you access Requester Pays buckets: https://cloudpathlib.drivendata.org/stable/authentication/#requester-pays-buckets-on-s3
You can test it in version 0.12.0
, which is on PyPI now.