cloudpathlib icon indicating copy to clipboard operation
cloudpathlib copied to clipboard

Support Azure Data Lake Storage Gen2

Open analog-cbarber opened this issue 4 years ago • 12 comments

Not sure if this problem also affects regular Blob storage accounts, but I have a Azure storage account using Data Lake Gen2 storage, and the I observe the following issues:

  • All paths are considered to be files (is_file() is true if it exists) and not directories. is_dir() never returns True
  • iterdir() always returns paths. If the input is a file, it just returns itself if it exists.
  • rmdir() always throws a CloudPathNotADirectoryError
  • unlink() does appear to remove empty directories, but is expected to throw CloudPathIsADirectoryError
  • rmtree() throws a CloudPathNotADirectoryError
  • rename() (if it works) copies contents instead of doing an actual rename, which is supported in Gen2 (#162)

There may be other issues.

analog-cbarber avatar Sep 02 '21 15:09 analog-cbarber

FYI, I see that if I use AzureBlobClient.get_metadata, I see there is a metadata entry indicating if the path is a folder:

>>> somedir.client._get_metadata(somedir)['metadata']
{'hdi_isfolder': 'true'}
>>> somefile.client._get_metadata(somefile)['metadata']
{}

analog-cbarber avatar Sep 02 '21 15:09 analog-cbarber

@analog-cbarber I'm not familiar with Data Lake Gen2. Should we expect the same client library code that we use (azure.storage.blob.BlobServiceClient) to work in the same way as with a normal blob on a storage account?

pjbull avatar Sep 02 '21 15:09 pjbull

I don't actually know, since I have never tried directly using the azure sdk code, but I can read/write files, list directories using iterdir and create directories using mkdir, so it is probably fairly similar.

At least part of the problem appears to be that you are assuming that directory entries do not exist (which may be true for regular blob storage):

    def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]:
        # short-circuit the root-level container
        if not cloud_path.blob:
            return "dir"

        try:
            self._get_metadata(cloud_path)
            # FIXME - this could be a directory
            return "file"
        except ResourceNotFoundError:
            prefix = cloud_path.blob
            if prefix and not prefix.endswith("/"):
                prefix += "/"

            # not a file, see if it is a directory
            container_client = self.service_client.get_container_client(cloud_path.container)

            try:
                next(container_client.list_blobs(name_starts_with=prefix))
                return "dir"
            except StopIteration:
                return None

analog-cbarber avatar Sep 02 '21 15:09 analog-cbarber

I think you're right that the file/dir detection is probably the root cause of most of the issues you list. I think we need to do some research on the API/SDK and how the different Azure services handle it to determine if there's a better way of doing this that works across normal blob storage (which at least seems to work reliably in our test suite, which we run against real backends) and other blob storage configurations.

pjbull avatar Sep 02 '21 15:09 pjbull

I think that data lake gen 2 storage can be used as either file or blob storage. I have only tried using the blob storage aspect myself, so I don't know if it matters. But I can access my blobs using URLs of the form: https://<account>.blob.core.windows.net/ just like regular blob storage.

It appears that they have mostly kept the REST API the same.

analog-cbarber avatar Sep 02 '21 15:09 analog-cbarber

If you can find the relevant parts of the documentation for the service and the SDK, that would be helpful for when someone can look at implementing a change here.

pjbull avatar Sep 02 '21 15:09 pjbull

Ok. For now, I am just going to try to hack my Azure subclasses to get this working for my case.

analog-cbarber avatar Sep 02 '21 15:09 analog-cbarber

I did some light reading about ADLS Gen2. In particular, this blog post was pretty helpful at explaining the differences between it and regular Blob Storage. It appears that ADLS Gen2 is kind of like an extension built on top of regular Blob Storage, which explains why our cloudpathlib code mostly works fine. But, the additional functionality that ADLS Gen2 adds on top of regular Blob Storage is having a hierarchical namespace (vs. a flat namespace), and therefore directories behaving differently is the purpose of using it.

So I think the status here is that we just don't support ADLS Gen2 right now. I'm going to change the issue title to reflect that. It probably makes sense for there to be a subclass that's specifically for ADLS Gen2.

jayqi avatar Sep 10 '21 22:09 jayqi

Ideally you should support both regular and gen2 storage in the same subclass because there is no way to tell from the URL alone which one it its. A URL of the form https://<account>.blob.core.windows.net/... could refer to either and users may not actually be aware of what scheme is being used. It will be a usability issue for them if you do not support this in a single class.

analog-cbarber avatar Sep 11 '21 13:09 analog-cbarber

I don't know that much about Azure Storage. Is it not the case that Blob Storage is .blob. and ADLS Gen2 is .dfs.? Is there some other way to determine if a blob is using regular Blob Storage or ADLS Gen2?

jayqi avatar Sep 11 '21 20:09 jayqi

No, that is not the case. gen2 storage can be accessed using .blob. paths using the blob api. I don't know if the .dfs. URL is just an alias or presents a different REST API, but whether it does or not that is probably a detail you should hide from the user if both refer to the same storage location.

So you cannot tell from the URL alone whether a .blob. URL refers to gen2 or not.

analog-cbarber avatar Sep 12 '21 15:09 analog-cbarber

FYI: here is my subclass that implements the azure://<storage-account>/<container>/... URL scheme and supports gen2 storage. I haven't tested it exhaustively but it seems to work for my use cases.


import re
from typing import Dict, Iterable, List, Pattern, Union, Optional
from urllib.parse import urlparse

from cloudpathlib import AzureBlobClient, AzureBlobPath, CloudPath
from cloudpathlib.client import register_client_class
from cloudpathlib.cloudpath import register_path_class, implementation_registry
from cloudpathlib.exceptions import InvalidPrefixError, CloudPathFileExistsError

try:
    import azure.storage.blob # pylint: disable=unused-import
except ImportError:
    implementation_registry['azure'].dependencies_loaded = False

__all__ = [
    'AzureClient',
    'AzurePath'
]

@register_client_class('azure2')
class AzureClient(AzureBlobClient):
    """Extends AzureBlobClient to fix some issues."""
    def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]:
        # short-circuit the root-level container
        if not cloud_path.blob:
            return "dir"

        try:
            if self._get_metadata(cloud_path).get('metadata',{}).get('hdi_isfolder', False):
                return "dir"
            else:
                return "file"
        except Exception as ex: # pylint: disable=broad-except
            if type(ex).__name__ != 'ResourceNotFoundError':
                raise
            prefix = cloud_path.blob
            if prefix and not prefix.endswith("/"):
                prefix += "/"

            # not a file, see if it is a directory
            container_client = self.service_client.get_container_client(cloud_path.container)

            try:
                next(container_client.list_blobs(name_starts_with=prefix))
                return "dir"
            except StopIteration:
                return None

    def _list_dir(
        self, cloud_path: AzureBlobPath, recursive: bool = False
    ) -> Iterable[AzureBlobPath]:
        try:
            from azure.storage.blob import BlobPrefix
        except ImportError:
            return

        container = cloud_path.container
        container_client = self.service_client.get_container_client(container)

        def yield_entries(prefix):
            for entry in container_client.walk_blobs(name_starts_with=prefix):
                subpath = f'{AzurePath.cloud_prefix}{self.service_client.account_name}/{container}/{entry.name}'
                yield AzurePath(subpath, client=self)
                if recursive and isinstance(entry, BlobPrefix):
                    yield from yield_entries(entry.name)

        yield from yield_entries(cloud_path.blob.rstrip('/') + '/')


    def _remove(self, cloud_path: AzureBlobPath) -> None:
        ftype = self._is_file_or_dir(cloud_path)
        if  ftype == "dir":
            dir_blobs: List[str] = [cloud_path.blob]
            file_blobs: List[str] = []
            for subpath in self._list_dir(cloud_path, recursive=True):
                if subpath.is_dir():
                    dir_blobs.append(subpath.blob)
                else:
                    file_blobs.append(subpath.blob)
            container_client = self.service_client.get_container_client(cloud_path.container)
            container_client.delete_blobs(*file_blobs)
            for dir_blob in reversed(dir_blobs):
                container_client.delete_blob(dir_blob.rstrip('/'))

        if ftype == 'file':
            blob = self.service_client.get_blob_client(
                container=cloud_path.container, blob=cloud_path.blob
            )
            blob.delete_blob()


@register_path_class('azure2')
class AzurePath(AzureBlobPath):
    """CloudPath for accessing Azure blob/dfs storage using azure:// URL scheme."""

    client: 'AzureClient'
    cloud_prefix: str = "azure://"

    def __init__(self, cloud_path: Union[str, CloudPath],
                 client: Optional[AzureBlobClient] = None,
                 token: Optional[str] = None
                 ):
        """Constructs new AzurePath instance

        Arguments:
            cloud_path: the resource path. May either be an existing AzurePath,
                a string of the form "azure://<account>/<container>..." or
                a URL of the form "https://<account>.blob.core.windows.net/<container>...".
            client: the client to use with this path. The client will be ignored if a
                token is provided or the input path has a query string.
            token: the SAS token to use to access this path. This will override any
                token in the path's query string.
        """
        if isinstance(cloud_path, str):
            if client is not None and cloud_path.startswith('az://'):
                cloud_path = f"{self.cloud_prefix}{client.service_client.account_name}/{cloud_path[5:]}"
            parsed = urlparse(cloud_path)
            m = re.match(r'(?P<account>[a-z0-9]+)(\.(?P<type>blob|dfs)(\.core\.windows\.net)?)?',
                         parsed.netloc,
                         flags=re.IGNORECASE)
            if m is None:
                raise ValueError(f"Bad azure path '{cloud_path}'")
            account = m.group('account')
            fstype = m.group('type') or 'blob'
            account_url = f'https://{account}.{fstype}.core.windows.net/'
            optional_type = '' if fstype == 'blob' else '.' + fstype
            cloud_path = f"azure://{account}{optional_type}/{parsed.path.lstrip('/')}"
            # cloud_path = f"az://{parsed.path.lstrip('/')}"
            if client is None or parsed.query or token or client.service_client.account_name != account:
                if token is not None:
                    token = '?' + token.lstrip('?')
                elif parsed.query:
                    token = '?' + parsed.query
                client = AzureClient(account_url, token)
        super().__init__(cloud_path, client = client)

    @classmethod
    def is_valid_cloudpath(cls, path: Union[str, CloudPath], raise_on_error=False) -> bool:
        """True either if this looks like an Azure blob/dfs path.

        Specifically, it either starts with azure:// or is an http:// URL of the form
        https://<account>.(blob|dfs).core.windows.net/...
        """
        valid = bool(re.match(r'(azure://|https://[a-z0-9]+\.(blob|dfs)\.core\.windows\.net)', str(path).lower()))

        if raise_on_error and not valid and not path.startswith('az://'):
            raise InvalidPrefixError(
                f"'{path}' is not a valid path since it does not start with '{cls.cloud_prefix}' "
                "or valid Azure https blob or dfs location."
            )

        return valid

    @classmethod
    def canonical_path(cls, path: str) -> str:
        """Canonicalize path if it is a valid AzurePath

        This canonicalizes path of the formt "https://<storageaccount>.blob|dfs.core.windows.net/..."
        to "azure://<storageaccount>[.dfs]/...". It leaves other paths alone.
        """
        return re.sub(r'^https://([a-z0-9]+)(?:\.blob|(\.dfs))\.core\.windows\.net/',
                      r'azure://\1\2/',
                      path,
                      flags=re.IGNORECASE)


    @property
    def account(self) -> str:
        """Azure storage account name"""
        return self.client.service_client.account_name

    @property
    def container(self) -> str:
        return self._no_prefix.split('/',2)[1]

    @property
    def drive(self) -> str:
        return f"{self.account}/{self.container}"

    def mkdir(self, parents=False, exist_ok=False):
        exists = self.is_dir()
        if exists:
            if not exist_ok:
                raise CloudPathFileExistsError(f"'{self}' already exists")
            return
        if self.is_file():
            raise CloudPathFileExistsError(f"'{self}' is already a file")
        if not parents and not self.parent.is_dir():
            raise FileNotFoundError(f"Parent dir does not exist: '{self.parent}'")

        # HACK: create an empty blob and remove it - works with data lake gen 2
        #   probably useless with regular blob storage account.
        child = self.joinpath('delete-me')
        child.write_text('')
        child.unlink()

analog-cbarber avatar Sep 12 '21 15:09 analog-cbarber