petastorm icon indicating copy to clipboard operation
petastorm copied to clipboard

Support for Azure Blob Storage and Azure Data Lake

Open upendrarv opened this issue 5 years ago • 5 comments

Hi,

Any plans to add support for Azure Blob storage and/or Azure Data Lake ?

Thanks for the great work !

upendrarv avatar Jul 20 '20 15:07 upendrarv

Hi there @upendrarv, thanks for opening this issue. Can you clarify specifically what you're asking?

jsgoller1 avatar Jul 20 '20 17:07 jsgoller1

This is not something we were planning to work on at the moment.

For this to work, we would need a proper Azure blob storage support from pyarrow. I am not sure what is arrow's progress on having a proper support of these Azure filesystems.The only thing I see in their documentation is this. It might be possible to do something using that technique in petastorm, but might get hairy as we do use pyarrow's ParquetDataset class.

selitvin avatar Jul 20 '20 18:07 selitvin

@jsgoller1 : Currently I am looking to refer the PetaStorm datasets stored in Azure Data Lake Storage and Azure Blob Storage.. Currently there is a support for S3 and GCS (Case 5 and Case 6 in [fs_util.py].(https://github.com/uber/petastorm/blob/c370bac366e86ca07c051cbd0daffacf866ecde1/petastorm/fs_utils.py)). Similarly I am looking for support for Azure Blob File System.

@selitvin : Thanks for the reply. In Python, adlfs package available. We can use adlfs to access the Azure Blob File System instead of taking dependency on pyarrow. I have created a prototype code to access the petastorm datasets present in Azure Blob File System. With below piece of code, I am able to read the dataset and train my ML model.

Below is the sample code:

import os
import posixpath

from adlfs import AzureBlobFileSystem
from petastorm.reader import Reader
from pyarrow.filesystem import FileSystem, DaskFileSystem
from pyarrow.util import implements, _stringify_path

class ABFSWrapper(DaskFileSystem):
    @implements(FileSystem.isdir)
    def isdir(self, path):
        path = _sanitize_azure_path(_stringify_path(path))
        try:
            contents = self.fs.ls(path)
            if len(contents) == 1 and contents[0] == path:
                return False
            else:
                if not any(ftype in path for ftype in
                           ['parquet', 'parq', 'metadata']):
                    raise ValueError('Directory is not a partition of'
                                     ' *.parquet. Try passing a globstring.')
                return True
        except OSError:
            return False

    @implements(FileSystem.isfile)
    def isfile(self, path):
        path = _sanitize_azure_path(_stringify_path(path))
        try:
            contents = self.fs.ls(path)
            return len(contents) == 1 and contents[0] == path
        except OSError:
            return False

    def walk(self, path, invalidate_cache=True, refresh=False):
        """
        Directory tree generator, like os.walk
        Generator version of what is in ABFSClient, which yields a flattened list of files
        """
        path = _sanitize_azure_path(_stringify_path(path))
        directories = set()
        files = set()

        for key in list(self.fs.ls(path, invalidate_cache=invalidate_cache)):
            path = key
            if self.isdir(path):
                directories.add(path)
            elif self.isfile(path):
                files.add(path)
            else:
                pass

        files = sorted([posixpath.split(f)[1] for f in files
                        if f not in directories])
        directories = sorted([posixpath.split(x)[1]
                              for x in directories])

        yield path, directories, files

        for directory in directories:
            for tup in self.walk(directory, refresh=refresh):
                yield tup
def _sanitize_azure_path(path):
    if path.startswith('adl://'):
        return path.replace('adl://', '')
    else:
        return path

abfs = AzureBlobFileSystem(account_name="xxxxxxxxxxxxxxxxxx", account_key="xxxxxxxxxxxxxxxxx")
wrapped_fs = ABFSWrapper(abfs)

dataset_url_without_protocol_prefix = "/petastorm/parquet"
with Reader(dataset_path=dataset_url_without_protocol_prefix, pyarrow_filesystem=wrapped_fs) as reader:
    .....................................................

The same piece of code can be added to Petastorm Library - similar to gcsfs_helper folder. Let me know your thoughts on the same.

upendrarv avatar Jul 21 '20 07:07 upendrarv

This is nice! Would you like to take a shot at adding a PR that would add this To Petastorm?

selitvin avatar Jul 21 '20 16:07 selitvin

@upendrarv Do it! I am facing this right now trying to instantiate a batch reader from a parquet table in Azure Blob Storage

sam-h-bean avatar Dec 29 '21 05:12 sam-h-bean