cloudpathlib
cloudpathlib copied to clipboard
Support Azure Data Lake Storage Gen2
Not sure if this problem also affects regular Blob storage accounts, but I have a Azure storage account using Data Lake Gen2 storage, and the I observe the following issues:
- All paths are considered to be files (
is_file()is true if it exists) and not directories.is_dir()never returns True iterdir()always returns paths. If the input is a file, it just returns itself if it exists.rmdir()always throws aCloudPathNotADirectoryErrorunlink()does appear to remove empty directories, but is expected to throwCloudPathIsADirectoryErrorrmtree()throws aCloudPathNotADirectoryErrorrename()(if it works) copies contents instead of doing an actual rename, which is supported in Gen2 (#162)
There may be other issues.
FYI, I see that if I use AzureBlobClient.get_metadata, I see there is a metadata entry indicating if the path is a folder:
>>> somedir.client._get_metadata(somedir)['metadata']
{'hdi_isfolder': 'true'}
>>> somefile.client._get_metadata(somefile)['metadata']
{}
@analog-cbarber I'm not familiar with Data Lake Gen2. Should we expect the same client library code that we use (azure.storage.blob.BlobServiceClient) to work in the same way as with a normal blob on a storage account?
I don't actually know, since I have never tried directly using the azure sdk code, but I can read/write files, list directories using iterdir and create directories using mkdir, so it is probably fairly similar.
At least part of the problem appears to be that you are assuming that directory entries do not exist (which may be true for regular blob storage):
def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]:
# short-circuit the root-level container
if not cloud_path.blob:
return "dir"
try:
self._get_metadata(cloud_path)
# FIXME - this could be a directory
return "file"
except ResourceNotFoundError:
prefix = cloud_path.blob
if prefix and not prefix.endswith("/"):
prefix += "/"
# not a file, see if it is a directory
container_client = self.service_client.get_container_client(cloud_path.container)
try:
next(container_client.list_blobs(name_starts_with=prefix))
return "dir"
except StopIteration:
return None
I think you're right that the file/dir detection is probably the root cause of most of the issues you list. I think we need to do some research on the API/SDK and how the different Azure services handle it to determine if there's a better way of doing this that works across normal blob storage (which at least seems to work reliably in our test suite, which we run against real backends) and other blob storage configurations.
I think that data lake gen 2 storage can be used as either file or blob storage. I have only tried using the blob storage aspect myself, so I don't know if it matters. But I can access my blobs using URLs of the form: https://<account>.blob.core.windows.net/ just like regular blob storage.
It appears that they have mostly kept the REST API the same.
If you can find the relevant parts of the documentation for the service and the SDK, that would be helpful for when someone can look at implementing a change here.
Ok. For now, I am just going to try to hack my Azure subclasses to get this working for my case.
I did some light reading about ADLS Gen2. In particular, this blog post was pretty helpful at explaining the differences between it and regular Blob Storage. It appears that ADLS Gen2 is kind of like an extension built on top of regular Blob Storage, which explains why our cloudpathlib code mostly works fine. But, the additional functionality that ADLS Gen2 adds on top of regular Blob Storage is having a hierarchical namespace (vs. a flat namespace), and therefore directories behaving differently is the purpose of using it.
So I think the status here is that we just don't support ADLS Gen2 right now. I'm going to change the issue title to reflect that. It probably makes sense for there to be a subclass that's specifically for ADLS Gen2.
Ideally you should support both regular and gen2 storage in the same subclass because there is no way to tell from the URL alone which one it its. A URL of the form https://<account>.blob.core.windows.net/... could refer to either and users may not actually be aware of what scheme is being used. It will be a usability issue for them if you do not support this in a single class.
I don't know that much about Azure Storage. Is it not the case that Blob Storage is .blob. and ADLS Gen2 is .dfs.? Is there some other way to determine if a blob is using regular Blob Storage or ADLS Gen2?
No, that is not the case. gen2 storage can be accessed using .blob. paths using the blob api. I don't know if the .dfs. URL is just an alias or presents a different REST API, but whether it does or not that is probably a detail you should hide from the user if both refer to the same storage location.
So you cannot tell from the URL alone whether a .blob. URL refers to gen2 or not.
FYI: here is my subclass that implements the azure://<storage-account>/<container>/... URL scheme and supports gen2 storage. I haven't tested it exhaustively but it seems to work for my use cases.
import re
from typing import Dict, Iterable, List, Pattern, Union, Optional
from urllib.parse import urlparse
from cloudpathlib import AzureBlobClient, AzureBlobPath, CloudPath
from cloudpathlib.client import register_client_class
from cloudpathlib.cloudpath import register_path_class, implementation_registry
from cloudpathlib.exceptions import InvalidPrefixError, CloudPathFileExistsError
try:
import azure.storage.blob # pylint: disable=unused-import
except ImportError:
implementation_registry['azure'].dependencies_loaded = False
__all__ = [
'AzureClient',
'AzurePath'
]
@register_client_class('azure2')
class AzureClient(AzureBlobClient):
"""Extends AzureBlobClient to fix some issues."""
def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]:
# short-circuit the root-level container
if not cloud_path.blob:
return "dir"
try:
if self._get_metadata(cloud_path).get('metadata',{}).get('hdi_isfolder', False):
return "dir"
else:
return "file"
except Exception as ex: # pylint: disable=broad-except
if type(ex).__name__ != 'ResourceNotFoundError':
raise
prefix = cloud_path.blob
if prefix and not prefix.endswith("/"):
prefix += "/"
# not a file, see if it is a directory
container_client = self.service_client.get_container_client(cloud_path.container)
try:
next(container_client.list_blobs(name_starts_with=prefix))
return "dir"
except StopIteration:
return None
def _list_dir(
self, cloud_path: AzureBlobPath, recursive: bool = False
) -> Iterable[AzureBlobPath]:
try:
from azure.storage.blob import BlobPrefix
except ImportError:
return
container = cloud_path.container
container_client = self.service_client.get_container_client(container)
def yield_entries(prefix):
for entry in container_client.walk_blobs(name_starts_with=prefix):
subpath = f'{AzurePath.cloud_prefix}{self.service_client.account_name}/{container}/{entry.name}'
yield AzurePath(subpath, client=self)
if recursive and isinstance(entry, BlobPrefix):
yield from yield_entries(entry.name)
yield from yield_entries(cloud_path.blob.rstrip('/') + '/')
def _remove(self, cloud_path: AzureBlobPath) -> None:
ftype = self._is_file_or_dir(cloud_path)
if ftype == "dir":
dir_blobs: List[str] = [cloud_path.blob]
file_blobs: List[str] = []
for subpath in self._list_dir(cloud_path, recursive=True):
if subpath.is_dir():
dir_blobs.append(subpath.blob)
else:
file_blobs.append(subpath.blob)
container_client = self.service_client.get_container_client(cloud_path.container)
container_client.delete_blobs(*file_blobs)
for dir_blob in reversed(dir_blobs):
container_client.delete_blob(dir_blob.rstrip('/'))
if ftype == 'file':
blob = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
)
blob.delete_blob()
@register_path_class('azure2')
class AzurePath(AzureBlobPath):
"""CloudPath for accessing Azure blob/dfs storage using azure:// URL scheme."""
client: 'AzureClient'
cloud_prefix: str = "azure://"
def __init__(self, cloud_path: Union[str, CloudPath],
client: Optional[AzureBlobClient] = None,
token: Optional[str] = None
):
"""Constructs new AzurePath instance
Arguments:
cloud_path: the resource path. May either be an existing AzurePath,
a string of the form "azure://<account>/<container>..." or
a URL of the form "https://<account>.blob.core.windows.net/<container>...".
client: the client to use with this path. The client will be ignored if a
token is provided or the input path has a query string.
token: the SAS token to use to access this path. This will override any
token in the path's query string.
"""
if isinstance(cloud_path, str):
if client is not None and cloud_path.startswith('az://'):
cloud_path = f"{self.cloud_prefix}{client.service_client.account_name}/{cloud_path[5:]}"
parsed = urlparse(cloud_path)
m = re.match(r'(?P<account>[a-z0-9]+)(\.(?P<type>blob|dfs)(\.core\.windows\.net)?)?',
parsed.netloc,
flags=re.IGNORECASE)
if m is None:
raise ValueError(f"Bad azure path '{cloud_path}'")
account = m.group('account')
fstype = m.group('type') or 'blob'
account_url = f'https://{account}.{fstype}.core.windows.net/'
optional_type = '' if fstype == 'blob' else '.' + fstype
cloud_path = f"azure://{account}{optional_type}/{parsed.path.lstrip('/')}"
# cloud_path = f"az://{parsed.path.lstrip('/')}"
if client is None or parsed.query or token or client.service_client.account_name != account:
if token is not None:
token = '?' + token.lstrip('?')
elif parsed.query:
token = '?' + parsed.query
client = AzureClient(account_url, token)
super().__init__(cloud_path, client = client)
@classmethod
def is_valid_cloudpath(cls, path: Union[str, CloudPath], raise_on_error=False) -> bool:
"""True either if this looks like an Azure blob/dfs path.
Specifically, it either starts with azure:// or is an http:// URL of the form
https://<account>.(blob|dfs).core.windows.net/...
"""
valid = bool(re.match(r'(azure://|https://[a-z0-9]+\.(blob|dfs)\.core\.windows\.net)', str(path).lower()))
if raise_on_error and not valid and not path.startswith('az://'):
raise InvalidPrefixError(
f"'{path}' is not a valid path since it does not start with '{cls.cloud_prefix}' "
"or valid Azure https blob or dfs location."
)
return valid
@classmethod
def canonical_path(cls, path: str) -> str:
"""Canonicalize path if it is a valid AzurePath
This canonicalizes path of the formt "https://<storageaccount>.blob|dfs.core.windows.net/..."
to "azure://<storageaccount>[.dfs]/...". It leaves other paths alone.
"""
return re.sub(r'^https://([a-z0-9]+)(?:\.blob|(\.dfs))\.core\.windows\.net/',
r'azure://\1\2/',
path,
flags=re.IGNORECASE)
@property
def account(self) -> str:
"""Azure storage account name"""
return self.client.service_client.account_name
@property
def container(self) -> str:
return self._no_prefix.split('/',2)[1]
@property
def drive(self) -> str:
return f"{self.account}/{self.container}"
def mkdir(self, parents=False, exist_ok=False):
exists = self.is_dir()
if exists:
if not exist_ok:
raise CloudPathFileExistsError(f"'{self}' already exists")
return
if self.is_file():
raise CloudPathFileExistsError(f"'{self}' is already a file")
if not parents and not self.parent.is_dir():
raise FileNotFoundError(f"Parent dir does not exist: '{self.parent}'")
# HACK: create an empty blob and remove it - works with data lake gen 2
# probably useless with regular blob storage account.
child = self.joinpath('delete-me')
child.write_text('')
child.unlink()