fastparquet icon indicating copy to clipboard operation
fastparquet copied to clipboard

Storing ParquetFile as a paquet file in filesystem

Open madarez opened this issue 4 years ago • 4 comments

I'm looking for a way to dump the ParquetFile to an actual file. I've tried write method replacing pandas data with ParquetFile object, but I receive the following error: TypeError: object of type 'ParquetFile' has no len() I need fastparquet to accomplish a simple task of concatenating multiple parquet files with the same schema which I could load them into a ParquetFile by passing them as a list. However, I can't find a way to write this and potentially send it off to my S3. It might be there, but I tried different attributes of ParquetFile to see a parquet output to no avail. If it's not there, I think it's a great API to add which may come handy to inspect the ParquetFile.

madarez avatar Jun 29 '20 21:06 madarez

The ParquetFile class does not have any write capability.

The signature for write says that you should do

fastparquet.write("filename", dataframe)

What did you do instead? It sounds like your workflow should be something like

dataframes = [fastparquet.ParquetFile(path).to_pandas() for path in pandas]
combineddf = pd.concat(dataframes)
fastparquet.write("out.parquet", combineddf)

or you can use the pandas convenience

dataframes = [pd.read_parquet(path, engine="fastparquet") for path in pandas]
combineddf = pd.concat(dataframes)
combineddf.to_parquet("out.parquet", engine="fastparquet")

If you want to do this direct to s3, you can provide open_with=s3.open for the fastparquet API or use a path like "s3://bucket/..." for the pandas convenience methods.

martindurant avatar Jun 30 '20 18:06 martindurant

Yes, thanks for your quick response. You are right, and I do understand that according to the documentation, I'm expected to provide a pandas dataframe to write method. The attempt that I showed the output for isn't compatible with correct write API:

# WARNING: not to be used, this is an invalid code snippet
from fastparquet import write, ParquetFile
import s3fs
s3 = s3fs.S3FileSystem()
files = s3.ls(path)
s3open = s3.open
pf = ParquetFile(files, open_with=s3open)
write(destination, pf, compression='SNAPPY', open_with=s3open)

There are two reasons why I wanted to avoid this:

  1. I was receiving errors when using pandas as an interim step as I have a nested structure in my data For example, this caused map data to be loaded with byte keys and values in pandas, hence the error: ValueError: Error converting column "mycolumn" to bytes using encoding JSON. Original error: keys must be str, int, float, bool or None, not bytes
  2. ParquetFile returns an object holding all the total row count which made it ideal to directly outputting to a file--without having to use SNAPPY or the overhead of pandas conversion. I do understand that this count is only summing up the smaller group row counts, yet I think the ultimate goal could be to have a merge of all types of columns when .

To build on the last point, yes it's inefficient at this point to have bunch of small rowgroups. However, there aren't any other lightweight tools in the ecosystem that has developed a proper way of merging all rowgroups yet either. Merging row groups could be the ideal and ultimate goal.

Other Related Projects:

  1. Parquet-tools: This ticket tracks this issue in the parquet-tools. Yet I tested this with its version 1.10 (the fixed version according to the issue), and received the following warning: you merged too small files. Although the size of the merged file is bigger, it STILL contains small row groups, thus you don't have the advantage of big row groups, which usually leads to bad query performance! The resolved tag is obsolete when you realize that the commit was reverted in the history of the relevant file.
  2. PyArrow: I followed this approach and learned that pyarrow complains that lists of structs from Parquet files not yet supported. This is unlike fastparquet which does support them.

Various Developer Calls

To support how and why storing bunch of parquet is really common use case, I've gathered here various posts on various community fora where the OP is asking for a way to merge multiple small parquet files into one:

  • https://stackoverflow.com/questions/38610839/how-to-merge-multiple-parquet-files-to-single-parquet-file-using-linux-or-hdfs-c
  • https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/
  • https://forums.databricks.com/questions/1509/parquet-file-merging-or-other-optimisation-tips.html
  • https://community.cloudera.com/t5/Support-Questions/combine-small-parquet-files/td-p/33525/page/2

I just thought that it'd be nice if I make a feature request here with fastparquet to potentially fill that gap.

madarez avatar Jun 30 '20 23:06 madarez

fastparquet does not support concatenating row-groups without decoding the contents - sorry, that's not what it was made for. I could imagine doing the job, though, it wouldn't be too complicated. However, parquet datasets are very typically made of many files in a directory, so I wonder why you want to go through this process. You want bigger partitions rather than the same partitions in a single file, right?

Maybe you want to use ParquetFile.iter_row_goups to generate dataframes and concatenate every N ones for writing. This is something you could also achieve with dask dataframe's repartition method. However, that still leaves the JSON/bytes thing. You may need to convert the pandas column, or you might try specifying object_encoding= to the write function. I'm not certain, but it ought to work.

martindurant avatar Jul 02 '20 14:07 martindurant

Okay thanks. My own reasons are cost-saving and performance (might not be the same across the community):

  • Cost-saving: S3 charges per request, and I only load the specific parquet files in a directory in my S3 altogether.
  • Performance: The parquet files I have are of very small sizes (~7 MB) resulted from a streaming scenario. Therefore, I just want to group some of these small parquets in more reasonable sized partitions.

I appreciate your pointer on dask 👍 So, understanding your comment, I figured I should split this issue/ticket into two:

  1. Coalesce/Repartitioning: dask is more fitting project to address this request and your kind comment clarified this for me that it's indeed part of dask's dataframe API.
  2. Storing a complex encoded structure: I followed the instructions on the encoding by passing all of the options provided for object_encoding and neither did result in a successful write. So, as you suggested I needed to transform my data out of bytes format.

Well, I have a map column of type Dict[bytes, bytes]. Following the repartitioning, I need to apply the lambda below to my map column in order to be able to write it: dask_df.map_column=dask_df.map_column.apply(lambda d: {k.decode():v.decode() for k,v in d.items()}, meta=('map_column', 'object')) This of course changes the underlying schema to Dict[str, str], but it's at least able to be stored. Initially, I thought the problem is with the nested structure. However, after this, I realized that I the problem is about the encoding (as you mentioned in your comment spot on).

I should say I'm able to pass a parquet schema to either read_parquet/ParquetFile or to_parquet/write if this could work in any way.

madarez avatar Jul 02 '20 19:07 madarez