s3fs icon indicating copy to clipboard operation
s3fs copied to clipboard

Support for S3 Select

Open ytsaig opened this issue 7 years ago • 5 comments
trafficstars

I'm not sure if this question / feature request belongs here or elsewhere in the dask ecosystem, so apologies if this is not the right place. I was wondering whether there are plans to support S3 Select for server-side filtering of data.

An example use case is reading a subset of columns/rows from a large parquet file. I haven't tested it, but can imagine that server-side filtering would improve performance substantially since it avoid transferring the full data over the network prior to filtering.

ytsaig avatar Nov 07 '18 18:11 ytsaig

This is certainly something that could live in s3fs, either as its own method, or as optional arguments to open() - I believe it should also return a file-like object. The doc sounds a little complicated, so I'd be relying on your implementation, and I doubt if moto, the testing/mocking library, supports it - so it would have to be an untested (in the sense of CI) feature.

If you were hoping to call this from Dask, that might be a little trickier, I don't know how you'd go about passing the arguments through from a call to something like dd.read_csv through to whichever method you implement on s3fs.

martindurant avatar Nov 07 '18 18:11 martindurant

S3 Select currently supports Parquet as input, but not as output. The data would have to go through an intermediary format (either CSV or JSON), respective serialization/deserialization and loss of information about data types in the process. I am not sure whether such logic is permissible for a filesystem layer.

An alternative to S3 Select in a special case of reading a subset of columns from a large Parquet file is to minimize the amount of reading libraries do, for example:

import pyarrow.parquet as pq
import s3fs

s3 = s3fs.S3FileSystem(default_fill_cache=False, default_block_size=1)
dataset = pq.ParquetDataset("{bucket}/{path}", filesystem=s3, validate_schema=False)
df = dataset.read(columns=columns).to_pandas()

This may not be as performant as S3 Select, but still tries to take advantage of the fact that Parquet is a columnar store, namely:

  • avoids caching data in between file chunks that have been explicitly requested (default_fill_cache=False). Also considerably lowers memory usage if reading from a large file.
  • limits read-ahead to 1 byte (default_block_size=1) and is useful if the requested columns are scattered around a Parquet file, not adjacent to each other. Setting default_block_size to zero or None would give you the default block size of 5MB, therefore don't use zero if you want to disable read-ahead completely.
  • disables schema validation (validate_schema=False), which potentially prevents pyarrow from rereading file metadata multiple times.

One may experiment with those options to see how they affect timing and data transfer for a particular file.

dvukolov avatar Nov 28 '18 13:11 dvukolov

@dvukolov , your method sounds reasonable.

I still don't have any principled objection to someone providing a SELECT endpoint via s3fs, but indeed I know little about the performance or serialisation issue. The output, the thing actually being accessed would have to behave like a file. If using parquet, I imagine it is useful for prefiltering on rows, when wanting to download only a small fraction of the original.

martindurant avatar Nov 28 '18 14:11 martindurant

Implementing S3 Select as a separate method would look something like this:

def select(self, path, expression, input_format, output_format):
    response = self.s3.select_object_content(...)
    
    f = io.BytesIO()
    for event in response["Payload"]:
        if "Records" in event:
            data = event["Records"]["Payload"]
            f.write(data)
        elif "End" in event:
            f.seek(0)
            return f

An alternative would be to take a BytesIO() object as an argument, rather than create it inside the method:

def select(self, buf, path, expression, input_format, output_format):
    ...

The second option makes a lot of sense as it allows:

  • to use a context manager with a file like object
  • to simplify parsing S3 Select output (e.g. add headers to the generated CSV, or transform generated JSON Lines into a regular JSON document by wrapping it in square brackets)

If this is something you are willing to consider, I can provide a draft implementation.

dvukolov avatar Nov 29 '18 19:11 dvukolov

Adding my 2 cents.

In my company we use gzip jsonlines as the input. Then we use dask for transformations and export to parquet.

Usually when we do ETL we need to filter by fields (that can be nested), using S3 Select with dask would reduce our processing time/cost significantly.

Is there any idea of how we could implement this? I see that s3fs follows fsspec interfaces. This means the get method is actually defined in fsspec. Interestingly, this method suports args and kwargs (which would be great for this use case) but do not use them.

manugarri avatar Aug 29 '19 21:08 manugarri