fastparquet icon indicating copy to clipboard operation
fastparquet copied to clipboard

Future Plans

Open martindurant opened this issue 3 years ago • 19 comments

A list of nice-to-haves that could be implemented in fastparquet. The order is arbitrary. I am writing this as a placeholder, and for people to express interest in the features or to help implement.

  • [x] deal with directories without _metadata
  • [x] move numba code to cython. The speed will be the same, but import will be faster and install in vanilla python (not anaconda) much smaller if we drop the dep for numba/llvm
  • [x] implement data page header V2. This is now seen in the wild, and brings the benefit of being able to skip past def/rep codes when appropriate, as well as enable decompress_into for plain encodings and no nulls (see below). Must be implemented for reading, should be implemented for writing (read-only)
  • [x] data page header V2 (writing)
  • [x] use of cramjam's decompress_into to avoid allocations and copies, with PLAIN encoding and no nulls; data header V2 only (because compressed blob is separate from def/reps)
  • [x] RLE_DICT encoding (reading and writing) is the new default over plain dicts (#583)
  • [x] DELTA int encoding (read) [works for one test dataset]
  • [ ] ~DELTA-LEN-BYTEARRAY encoding (requires the former) is now standard. Would be very useful for awkward/arrow buffers, as opposed to numpy object arrays~
  • [ ] BYTE_STREAM_SPLIT codec for floats (read/~write~)
  • [x] row filtering; would require two read passes per row-group, one on the filter operand columns to build the row mask, followed by a read of the remaining rows (having created a new empty dataframe and filled with the operand column's values, ideally without copy). Should have shortcut for all/non filters
  • [x] new time types in the parquet format
  • [x] use pandas nulled types (int, bool, ...?); accept for writing and probably make default for reading. Writing already works in some cases. Already some work (#465 ; see also #464 for notes on sparse CSC, which may be a good idea when we know most values are null)
  • [x] distribute as wheels on pypi
  • [ ] use own cythonised implementation of thrift
  • [ ] improve test coverage >95% (91% at fe8dc1b ; .py files only)
  • [ ] Make statistics optional on write (to reduce metadata size, and improve both write and read-open speed)

martindurant avatar Apr 19 '21 17:04 martindurant

Hi @martindurant

I found out feature request #244 (adding new colum to existing file parquet). Do you think it could be revived?

I am in the same need than those described, operating several processings on a dataset, which results will all share the same index. But because these separate processing steps produce results that are not needed in the other processing steps, and to save memory, I would like to process them step by step, recording each time their result, then discarding the results from memory and start the new processing step.

Without this feature, if storing in separate files, I have a concatenation step when reading everything back. With this feature, I am thus mostly saving this step.

Might there be some improvement in the lib related to this feature since it was discussed last, that maybe would make having this feature now more easily? Thanks for your feedback, Bests

yohplala avatar Apr 29 '21 10:04 yohplala

Do you mean supporting schema evolution (ability to add columns or promote types in later files of a dataset) or actually writing a new column to an existing file? I am happy to add one or both to the list of desirables, but of course that doesn't mean that they will get implemented. To directly answer your question: no progress has been made on this topic since the linked issue.

martindurant avatar Apr 29 '21 17:04 martindurant

Do you mean supporting schema evolution (ability to add columns or promote types in later files of a dataset) or actually writing a new column to an existing file? I am happy to add one or both to the list of desirables, but of course that doesn't mean that they will get implemented. To directly answer your question: no progress has been made on this topic since the linked issue.

Hmmm, given the use case I see, I don't think there is a difference to store data in a single file or in separate files (purely from a 'use case' perspective). In my mind, there is thus no trouble adding new columns in new files, and likely this latter approach is easier from an 'implementation' perspective. The gain to restore a full dataset when reading back with to_pandas remains.

So this would essentially be supporting schema evolution to add new columns.

of course that doesn't mean that they will get implemented.

I fully understand Martin, no worries.

yohplala avatar Apr 29 '21 18:04 yohplala

In my mind, there is thus no trouble adding new columns in new files

I perhaps should have been clearer: this means that you have new partitions of data that have data in the new column, but previous partitions will have no data for that column (i.e., the values would be loaded as NULL/None/NaN).

martindurant avatar Apr 29 '21 18:04 martindurant

In my mind, there is thus no trouble adding new columns in new files

I perhaps should have been clearer: this means that you have new partitions of data that have data in the new column, but previous partitions will have no data for that column (i.e., the values would be loaded as NULL/None/NaN).

Thanks for your clarification Martin. I am thinking that what 'could be implemented here' (not saying it should be implemented) is a free interpretation of the data embedded in a parquet file.

IF index without duplicates are used AND data in different parquet files without same columns THEN at loading, a transparent concatenation step can be managed to merge the different parquet files both in rows and columns

Hmmm, it opens the door to something that seems quite complex if you mix that with file_scheme=hive (hence shared columns between files to keep track of partitioning values) I will not push more then, not thinking it is really worth it.

Thanks again Martin. Bests,

yohplala avatar May 01 '21 18:05 yohplala

IF index without duplicates are used AND data in different parquet files without same columns THEN at loading, a transparent concatenation step can be managed to merge the different parquet files both in rows and columns

Sounds like a specialist merge function that doesn't need to live in the library.

martindurant avatar May 01 '21 20:05 martindurant

Sounds like a specialist merge function that doesn't need to live in the library.

The concern is performance. As far as I understand fastparquet, in its current state, it already does a merge: the 'DataFrames' are already stored in different parquet files when file_scheme=hive. From my knoweldge of pandas, doing a single concat is faster than doing 2. concat([df1, df2, df3, df4]) is faster than concat([concat([df1, df2]), concat([df3, df4]))

So having it in fastparquet (as I understand it) would result in a faster data loading. But again, yes, ok, it appears to be complex, and personally, it is not something I can delve into at the moment.

yohplala avatar May 02 '21 09:05 yohplala

Yes, understood - and that fastparquet could provide a way to do this without loading, concat and rewriting everything. But it would not need to be part of the main API or even necessarily in this package. If for some reason, I had to dedicate time to it, I could imagine how it would work, for both types of append I talk about above.

martindurant avatar May 03 '21 14:05 martindurant

[...]. But it would not need to be part of the main API or even necessarily in this package.

Bouncing on your comment :) I think there does be an idea, that there could be a package to do 'magic things' in the context of 'special cases'.

  • There is this case I mention above for data loading with transparent concatenation of rows that require no duplicate in index to be possible (similarly that current row grouping logic works through different files because there are no duplicates in columns)
  • There is some 'magic' that can be done as soon an 'index colum' has no duplicate and is sorted. You may remember the 1st time I popped here for an 'append with overwrite' feature. We implemented a 'limited' overwrite. I am using it in a separate lib in combination with the rest of the logic I presented you :)

Hmmm maybe this topic goes a bit wild ah ah :) I am thinking this is how giving database features using parquet as a backend :)

yohplala avatar May 04 '21 06:05 yohplala

Sounds like Hive! As you have found, though, parquet does not lend itself to CRUD operations.

martindurant avatar May 04 '21 14:05 martindurant

@martindurant would this item in your list BYTE_STREAM_SPLIT codec for floats (read/write) enable writing to a file stream instead of only to disk? If not, should I open a feature request/ issue for writing to a file stream?

Currently I use the pyarrow engine because it allows me obtain the bytes of the file without first dumping to disk. However, I would like to use fastparquet for this because it is good at preserving the timedelta dtype. But I believe I have to dump fastparquet generated files to disk first before I can redundantly read their bytes from disk.

blob = io.BytesIO()

dataframe.to_parquet(
    blob #<--- this is what I want to do with fastparquet
    , engine = 'pyarrow'				
    , compression = 'gzip'
    , index = False
)

# to disk alternative
temp_file_name = f"{app_dir}temp.parquet"
dataframe.to_parquet(
    temp_file_name
    , engine = 'fastparquet'
    , compression = 'gzip', index = False	
)

aiqc avatar Jun 15 '21 16:06 aiqc

BYTE_STREAM_SPLIT is something completely different, around how data is represented in bytes in the output.

What you want already exists. Example:

import fsspec
fs = fsspec.filesystem("memory")
df.to_parquet("memory://temp.parq", engine="fastparquet")

the output is at fs.store['/temp.parq'] (the IO object) or fs.cat("/temp.parq") (the bytes).

You can also more directly (but this is unorthodox)

b = fsspec.implementations.memory.MemoryFile()
df.to_parquet("", engine="fastparquet", open_with=lambda *_: b)

We use a MemoryFile as opposed to a BytesIO because fastparquet would close the file when it's done, making the contents unavailable.

martindurant avatar Jun 15 '21 16:06 martindurant

@martindurant thank you for the guidance. For future readers, I'll also call fs.delete("memory://temp.parq") to release the memory after writing the last file, unless that is redundant somehow.

aiqc avatar Jun 15 '21 16:06 aiqc

I'll also call fs.delete

Yes, that is a reasonable thing to do - the data would persist in memory otherwise.

martindurant avatar Jun 15 '21 16:06 martindurant

What you want already exists. Example:

import fsspec
fs = fsspec.filesystem("memory")
df.to_parquet("memory://temp.parq", engine="fastparquet")

the output is at fs.store['/temp.parq'] (the IO object) or fs.cat("/temp.parq") (the bytes).

Hi @martindurant, I am sorry for my probably naive questions about this, I am a complete newbie on this topic.

1- would you have some pointers that would describe how pandas makes the link between memory in

df.to_parquet("memory://temp.parq", engine="fastparquet")

and the memory handle created by fsspec?

fs = fsspec.filesystem("memory")

Is an environment variable keeping track of 'disk partition' IDs, and when doing fs = fsspec.filesystem("memory"), a new 'virtual' one is created in memory?

2- is this the same thing as 'parquet serialization' I see documented in Karthotek here

Thanks for the hints, Bests

yohplala avatar Jun 15 '21 17:06 yohplala

When pandas encounters a URL with "protocol://..." it passes the URL to fsspec, which returns the filesystem to operate on (in this case, MemoryFileSystem), an instance with an open method which return file-like objects. This works for either engine, whereas the open_with= form is specific to fastparquet.

is this the same thing as 'parquet serialization'

A glance at those docs suggests that it might be the same or similar. I don't know kartothek well. Certainly, parquet is, in general, a decent serialisation or persistence format for tabular data. However, the term "serialisation" more commonly means a temporary format for transmission across a communication channel, rather than a file format like this.

martindurant avatar Jun 15 '21 18:06 martindurant

Thanks a lot for your reply @martindurant !

yohplala avatar Jun 15 '21 18:06 yohplala

(updated)

Hi @martindurant , I was wondering if there is any possibility (or maybe it already exists?) to multithread the write function? I see some potential at 2 levels:

  • multi-threading when scheme=hive, meaning each row group (either based on num_row_group or partition_on), which becomes basically a separate file, is written in parallel.

  • whatever the number of files to be written (one or several) multi-threading the compression step,

    • each column in parallel,
    • and perhaps, even for a single column, multi-threading the compression work as the zstdmt lib seems to be doing with Brotli or Snappy. But this seems to be more work, digging with compression lib API...

(personally, i am using Brotli, because ratio is excellent, but compression speed is definitely a bottleneck, as illustrated in this SO answer...)

yohplala avatar Jun 24 '21 06:06 yohplala

fastparquet has no plans to implement threading internally - we integrate well with dask, so there's no need to repeat work here. By default, dask uses a threaded scheduler, and parallelism is at the row-group level. Column-level parallelism is in theory possible, but only for reading.

The compression routines do release the GIL (at least in cramjam); but the cython code does not, as things stand. It would be reasonable to edit, adding nogil and with nogil to functions in cencoding.pyx, I think all of them. That would help with threading performance.

martindurant avatar Jun 24 '21 15:06 martindurant