fastparquet
fastparquet copied to clipboard
schema evolution when writing the row groups does not work
Describe the issue: when we write row groups, schema evolution should be easy and should be supported. This is very important for long existing live dataset, we usually want to add/remove some additional columns Rewrite the entire dataset to adopt the schema is too expensive. The big advantage of parquet is to avoid re-write the entire dataset for small schema changes like this
arrow actually support it at least in rust and R I think.
Minimal Complete Verifiable Example:
import fastparquet
import pandas as pd
df_test1 = pd.DataFrame({'a': [1., 2]})
df_test2 = pd.DataFrame({'a': [1., None], 'c': [20, 12]})
pq_dir = "/tmp/test1.pq"
for _i, _df in enumerate([df_test1, df_test2]):
fastparquet.write(pq_dir, _df, file_scheme="hive", append=_i>0)
ValueError: Column names of new data are ['a', 'c']. But column names in existing file are ['a']. {'c'} are columns being either only in existing file or only in new data. This is not possible.
Environment:
- Dask version: '2023.10.1'
- Python version: python 3.10
- Operating System: ubuntu 22.04
- Install method (conda, pip, source): pip
Fastparquet likes to maintain a global _metadata for its datasets for speed of loading and filtering data. This (and _common_metadata) are not compatible with schema evolution: it contains exactly one schema which all files must meet. There is no current way to avoid rewriting _metadata and requiring consistency during append. (writing via dask may work however)
If, instead, you write new files into your directories without append=True
and remove the metadata, then evolution should be expected to work when adding or upcasting columns, but it has been strictly experimental. You would probably need to write files for each group manually or use functions like fastparquet.writer.write_multi or write_part_file .
example
import fastparquet
import pandas as pd
df_test1 = pd.DataFrame({'a': [1., 2]})
df_test2 = pd.DataFrame({'a': [1., None], 'c': [20, 12]})
pq_dir = "supertemp"
fastparquet.write(f"{pq_dir}/1.parq", df_test1, file_scheme="simple")
fastparquet.write(f"{pq_dir}/2.parq", df_test2, file_scheme="simple")
# note reversed order to infer most complete schema
pf = ParquetFile(["supertemp/2.parq", "supertemp/1.parq"])
# note specifying float for "c", since it must be nullable
pf.to_pandas(dtypes={"a": "f4", "c": "f4"})
Thanks a lot for the detailed response!
Fastparquet likes to maintain a global _metadata for its datasets for speed of loading and filtering data. This (and _common_metadata) are not compatible with schema evolution: it contains exactly one schema which all files must meet. There is no current way to avoid rewriting _metadata and requiring consistency during append. (writing via dask may work however)
Since in your API, update metadata is kind of allowed. Then this should not be a problem. At each writing, in theory, we could update the schema in common meta to the merged/evolved schema. The fact is we already update the _metadata, but why not also the _common_metadata?
Can we have something like arrow's unify_schema()?
The real problem here is even I manually update the schema in _*metadata to make it compatible with both files (with the c column), the fastparquet cannot load the data and complain column c is missing from one file. Maybe I did something wrong here.
If, instead, you write new files into your directories without
append=True
and remove the metadata, then evolution should be expected to work when adding or upcasting columns, but it has been strictly experimental. You would probably need to write files for each group manually or use functions like fastparquet.writer.write_multi or write_part_file .
The point is to use the append to let it automatically update _metadata and automatically generate proper file name.
And we also like to be able to read data written by other programs for example spark, etc. There are TB level data you would never want to import/export all the times. That is probably the biggest reason we choose to use parquet at the first place.
So now I see dask seems not intend to maintain the compatibility for data exchange but rather to treat parquet as a kind of internal data format should only be read/write by dask/fastparquet. It is pretty fine though, different projects will have different priorities. If this is the case we can close this issue.
It could be done, and is probably worthwhile, but I don't know who has the effort to spare.
I'll just clarify a couple of points:
- the _metadata file contains a bunch of things, including exactly one schema, specific column offsets and statistics, and key-value metadata (global and per column chunk). So there are multiple things called "metadata".
- the _metadata file is a convention where you can know the contents of all the files by reading the one file; I don't believe it is ever used in conjunction with indeed isn't that much used at all any more.
- _common_metadata is an interesting intermediate. Also just a convention, it is unclear what it should contain in the presence of evolution.
So, to implement this (given fastparquet already has some of the basic functionality), the main task is to decide what should happen for the various cases.