gravitino icon indicating copy to clipboard operation
gravitino copied to clipboard

[Subtask] Add Python Arrow FileSystem implementation for fileset.

Open coolderli opened this issue 1 year ago • 7 comments

Describe the subtask

Add Python Arrow FileSystem implementation for fileset.

Parent issue

#1241

coolderli avatar Feb 05 '24 02:02 coolderli

@jerryshao I just created this subtask. So we can talk more about it here. I'm thinking if we should support a python client. In the python Arrow Filesystem, we have to load the fileset and get the physical storage location.

coolderli avatar Feb 05 '24 02:02 coolderli

Yeah, I think so. We should have a python client beforehand.

jerryshao avatar Feb 05 '24 02:02 jerryshao

@jerryshao I found the client module dependent on api and common module. Should we have to implement them in Python again? Or we can use some bridge to invoke them like Py4j. Maybe later we may need a go or c client. Can you share your thoughts? Thanks.

I think Ray engine(https://github.com/datastrato/gravitino/issues/1355) also needs a Python library. Are we already working on this?

coolderli avatar Feb 06 '24 01:02 coolderli

We planned to do the python library, but we haven't yet kicked off this.

From my wild thinking, because we use REST protocol to communicate with server, so we don't have to use py4j to bridge Java code. One possible way is to also write a Python counterpart.

Also, I was thinking that since we are using REST protocol, maybe we can build a model and using some code generation tools to generate sdks for different languages.

jerryshao avatar Feb 06 '24 02:02 jerryshao

@jerryshao @xunliu @shaofengshi Hi, I'm coming to talk about how to implement Python gvfs. After talking with @xloya offline, I think we have two ways.

  1. Implement GravitinoFileSystem in Python. The GravitinoFilesystem will inherit pyarrow.fs.FileSystem and proxy the real FileSystem such as pyarrow.fs.HadoopFilesystem or pyarrow.fs.S3FileSystem. More implementation of Filesystem can be found in this doc. In this way, we have to implement a new FileSystem, and the real implementation will depend on the pyarrow. If certain storage protocol types are missing, we need to implement them on the pyarrow side.
  2. Just use the pyarrow.fs.HadoopFilesystem to read the gvfs. The gvfs is compatible with the Hadoop protocol. So we can use the pyarrow.fs.HadoopFilesystem.from_uri("gvfs://fileset/catalog/schema/fileset") to reading the data. In this way, we will also use the Hadoop protocol to read the object storage. It's consistent with the usage of Java. However, when using it, some parameter transfer issues may need to consider whether it is sufficiently user-friendly.

By the way, I saw the issue about the tensorflow-connector and pytorch-connector. Will we provide some advanced APIs for greater ease of use?Can you share your thoughts? Thanks.

coolderli avatar May 14 '24 02:05 coolderli

@jerryshao @xunliu @shaofengshi Hi, I'm coming to talk about how to implement Python gvfs. After talking with @xloya offline, I think we have two ways.

  1. Implement GravitinoFileSystem in Python. The GravitinoFilesystem will inherit pyarrow.fs.FileSystem and proxy the real FileSystem such as pyarrow.fs.HadoopFilesystem or pyarrow.fs.S3FileSystem. More implementation of Filesystem can be found in this doc. In this way, we have to implement a new FileSystem, and the real implementation will depend on the pyarrow. If certain storage protocol types are missing, we need to implement them on the pyarrow side.
  2. Just use the pyarrow.fs.HadoopFilesystem to read the gvfs. The gvfs is compatible with the Hadoop protocol. So we can use the pyarrow.fs.HadoopFilesystem.from_uri("gvfs://fileset/catalog/schema/fileset") to reading the data. In this way, we will also use the Hadoop protocol to read the object storage. It's consistent with the usage of Java. However, when using it, some parameter transfer issues may need to consider whether it is sufficiently user-friendly.

By the way, I saw the issue about the tensorflow-connector and pytorch-connector. Will we provide some advanced APIs for greater ease of use?Can you share your thoughts? Thanks.

Some test codes for solution 2, it has been preliminarily verified and can be used:

if __name__ == "__main__":
    from pyarrow import fs
    configs = {
        'fs.gvfs.impl': 'com.datastrato.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem',
        'fs.AbstractFileSystem.gvfs.impl': 'com.datastrato.gravitino.filesystem.hadoop.Gvfs',
        'fs.gravitino.server.uri': 'http://localhost:8090',
        'fs.gravitino.client.metalake': 'gravitino_test',
        'fs.gravitino.client.authType': 'simple'
    }
    config_string = '&'.join([f"{key}={value}" for key, value in configs.items()])

    concat_uri = "{}?{}".format("gvfs://fileset", config_string)
    gvfs = fs.HadoopFileSystem.from_uri(uri=concat_uri)

    # read a parquet file
    import pyarrow.parquet as pq
    table = pq.read_table(
        "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/part-00001-6a1fc414-798b"
        "-4d2f-a6d3-4f7c39191f76-c000.snappy.parquet", filesystem=gvfs)
    print(table.to_batches())

    # list file info
    fileInfo = gvfs.get_file_info(
        fs.FileSelector("gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507"))
    for info in fileInfo:
        print(info)

    # open a file, and download its content
    with gvfs.open_input_stream(
            "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/part-00001-6a1fc414-798b-4d2f-a6d3"
            "-4f7c39191f76-c000.snappy.parquet") as file:
        inputs = file.readall()
        print(inputs)

    # write a file
    import pyarrow as pa
    table_schema = pa.schema([
        ("id", pa.int32()),
        ("name", pa.string()),
        ("age", pa.int32())
    ])
    with gvfs.open_output_stream(
            "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/test.arrow") as file:
        with pa.RecordBatchFileWriter(file, table_schema) as writer:
            writer.write_table(table)

xloya avatar May 14 '24 03:05 xloya

Thanks @coolderli @xloya for your input, for how to support Arrow/Ray, I may need more investigation to see how we can support this, just give me some time, appreciated.

jerryshao avatar May 14 '24 03:05 jerryshao

Hi @jerryshao @coolderli @xloya , I would love to help this, is there anything I can help? I have already tried Solution 2 above and it does work.

noidname01 avatar May 21 '24 03:05 noidname01

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure python solution, right? I feel like solution 1 could be more generic, what do you think?

jerryshao avatar May 21 '24 07:05 jerryshao

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure java solution, right? I feel like solution 1 could be more generic, what do you think?

According to my understanding and research, solution 2 reuses the capabilities of PyArrow.HadoopFileSystem, and its calling process is PyArrow.HadoopFileSystem -> libhdfs.so -> GVFS. In solution 1, I think that what we need to do is another FileSystem similar to PyArrow.HadoopFileSystem, such as PyArrow.GravitinoFileSystem. If we need to access HDFS in PyArrow.GravitinoFileSystem, we still need the Hadoop environment in the Python environment. So we still need to go through the process of PyArrow.GravitinoFileSystem -> libhdfs.so -> GVFS.

xloya avatar May 21 '24 10:05 xloya

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure java solution, right? I feel like solution 1 could be more generic, what do you think?

According to my understanding and research, solution 2 reuses the capabilities of PyArrow.HadoopFileSystem, and its calling process is PyArrow.HadoopFileSystem -> libhdfs.so -> GVFS. In solution 1, I think that what we need to do is another FileSystem similar to PyArrow.HadoopFileSystem, such as PyArrow.GravitinoFileSystem. If we need to access HDFS in PyArrow.GravitinoFileSystem, we still need the Hadoop environment in the Python environment. So we still need to go through the process of PyArrow.GravitinoFileSystem -> libhdfs.so -> GVFS.

@xloya Yes. In solution 1, the object storage such s3 will use the PyArrow.S3Filesystem not the PyArrow.HadoopFilesystem. I think it's more native, it will not need the Hadoop configuration. Considering the cloud-native environment, I think Solution 1 may be better.

coolderli avatar May 21 '24 10:05 coolderli

I also think the Solution 1 is better, the Hadoop configuration is redundant for the case if we only need other storage like S3, and it depends on the Hadoop Native Libraries (a.k.a. libhdfs.so), which are needed to build from source and manually configure everything related to generated libhdfs.so, I think it's not user-friendly for non-HDFS users.

noidname01 avatar May 21 '24 13:05 noidname01

@noidname01 Hi, we had a brief discussion on this issue with @jerryshao yesterday, and we agreed with solution 1. We can directly connect to storage SDKs such as S3 / OSS through this solution, but I think HDFS also needs support. So I will investigate the feasibility of solution 1 firstly. If you are interested, please participate in the subsequent design and development. Thanks!

xloya avatar May 22 '24 04:05 xloya

@xloya Sounds great, I'm in👍

noidname01 avatar May 22 '24 04:05 noidname01

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

xloya avatar May 23 '24 10:05 xloya

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

@xloya Can you please explain more about why chose fsspec but not pyarrowFilesystem? Then everyone can provide more feedback. Another reason to choose pyArrow is that Arrow is convenient integration with other engines. You can give some code to show how fsspec achieves it.

coolderli avatar May 23 '24 11:05 coolderli

Like Ray support PyArrow filesystem, I'm not sure it supports fsspec as well🤔

noidname01 avatar May 23 '24 11:05 noidname01

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

@xloya Can you please explain more about why chose fsspec but not pyarrowFilesystem? Then everyone can provide more feedback. Another reason to choose pyArrow is that Arrow is convenient integration with other engines. You can give some code to show how fsspec achieves it.

I consider some resons from the following aspects:

  1. Interface diversity: fsspec provides a richer interface implementation and is a lower-level file interface definition.
  2. Storage system support: fsspec currently supports mainstream cloud storage systems(s3/azure/gcs/oss) is more than PyArrow(s3/gcs).
  3. fsspec has very good compatibility with PyArrow: This means that any file system implemented with reference to fsspec can be used compatible with PyArrow, you could check docs in Arrow web(https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow).
  4. The open source implementation of fsspec is more abundant: Whether it is a cloud storage vendor or a data company, they will give priority to implementing file systems through fsspec. I currently cannot find any additional open source file system implementations other than those in the PyArrow project.

xloya avatar May 23 '24 11:05 xloya

Like Ray support PyArrow filesystem, I'm not sure it supports fsspec as well🤔

I think it is theoretically possible, because judging from the descriptions in the fsspec and PyArrow documents, the file systems of the two are compatible with each other. In addition, fsspec is naturally supported by pandas, so I think ray can analyze and process data through pandas.

xloya avatar May 23 '24 11:05 xloya

Tesorflow and Pytorch Lighting natively support fsspec-compatible file systems: 1.https://github.com/tensorflow/tensorboard/pull/5248 2.https://lightning.ai/docs/pytorch/stable/common/remote_fs.html

xloya avatar May 23 '24 11:05 xloya

@xloya do you have some documents about fsspec, maybe we should discuss more about the pros and cons of fsspec, pyarrow filesystem and others.

jerryshao avatar May 23 '24 12:05 jerryshao

@xloya do you have some documents about fsspec, maybe we should discuss more about the pros and cons of fsspec, pyarrow filesystem and others.

Yeah, I will post a doc tomorrow.

xloya avatar May 23 '24 13:05 xloya

@jerryshao @noidname01 @coolderli Hi, I have open a document for implementation selections. Please take a look and comment if you have any questions. https://docs.google.com/document/d/1y1GX7HWha1DH6XFU7VC_0NzmOoTA9j3Ppw1yEzjPpf8/edit?usp=sharing

xloya avatar May 24 '24 03:05 xloya