parquet-java
parquet-java copied to clipboard
PARQUET-2006: Column resolution by ID
Co-Authored-By: Huaxin Gao [email protected] Co-Authored-By: Xinli Shang [email protected]
Jira
- [ ] My PR addresses the following Parquet Jira issues and references them in the PR title. For example, "PARQUET-1234: My Parquet PR"
- https://issues.apache.org/jira/browse/PARQUET-2006
- In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.
Changes in this PR:
Parquet relies on the column name. In a lot of usages e.g. schema resolution, this would be a problem. For example, Iceberg has schema evolution and column name could be changed.
This PR will add the support of column resolution by Id. The Id we are using is the field_id in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398). After this PR, if the user explicitly requests for column Id resolution, Parquet readers will use the field_id to determine which Parquet columns to read.
The changes are as follows:
In write path, ParquetOutputFormat.COLUMN_ID_RESOLUTION is introduced, the default is false. If sets to true, the field_id has to be unique in the entire schema, otherwise, an Exception will be thrown. In read path, ParquetInputFormat.COLUMN_ID_RESOLUTION is introduced and the default is false. If sets to true, Parquet readers will resolve the column using field_id. If there are duplicate id in the schema, an Exception will be thrown.
For Filters, this PR adds the choice of constructing a filter using column ID instead of column path for example, originally we only allow
IntColumn intColumn = intColumn("a.b.c");
FilterPredicate predicate = eq(intColumn, 7);
Now we can have
IntColumn intColumn = intColumn(new Type.ID(1));
FilterPredicate predicate = eq(intColumn, 7);
For Filters, the column resolution is done at SchemaCompatibilityValidator.validateColumn. In this method, the file's schema is used as the source of truth to resolve and validate the filter's columns.
For read schema, the column resolution is done at ParquetFileReader.setRequestedSchema. Again, the file's schema is used as the source of truth to resolve and validate the requested schema. If ParquetInputFormat.COLUMN_ID_RESOLUTION sets to true, the id in the requested schema will be used to figured out the original ColumnPath.
For example, if the file schema has column name random, type int, id 1 , column name name, type String, id 2 as follows (type is Spark type)
val writeSchema =
new StructType()
.add("random", IntegerType, true, withId(1))
.add("name", StringType, true, withId(0))
if after schema evolution, the table schema is changed to column name a, type int, id 1 , column name b, type String, id 2 as follows (type is Spark type)
val readSchema =
new StructType()
.add("a", StringType, true, withId(0))
.add("b", IntegerType, true, withId(1))
the id in the requested schema (the above readSchema) will be used to compared to the file schema to figure out the correct ColumnPath to read.
Tests
- [ ] My PR adds the following unit tests OR does not need testing for this extremely good reason:
Commits
- [ ] My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
- Subject is separated from body by a blank line
- Subject is limited to 50 characters (not including Jira issue reference)
- Subject does not end with a period
- Subject uses the imperative mood ("add", not "adding")
- Body wraps at 72 characters
- Body explains "what" and "why", not "how"
Documentation
- [ ] In case of new functionality, my PR adds documentation that describes how to use it.
- All the public functions and the classes in the PR contain Javadoc that explain what it does
Can you rebase and combine all the commits?
Rebased. @shangxinli
cc @ggershinsky @sunchao Could you please take a look? Thanks!
Hi. @huaxingao Thanks for working on it. I just had a first-round review and left some comments. After we address them, I will have another look.
hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)? I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).
@ggershinsky I updated the description. Please check again to see if it is clear to you. Thanks!
Thanks @huaxingao , one more question / clarification. In the writer,
field_id has to be unique in the entire schema, otherwise, an Exception will be thrown.
what happens if the column ids are not set by the caller?
In the reader,
the id in the requested schema will be used to figured out the original ColumnPath.
what happens if the file footer doesn't have the column ids (field_id were not set by the writer)?
@huaxingao @ggershinsky Will you be able to join tomorrow's meeting to have a discussion on the open issues? We can try to close tham in the meeting and move this PR forward.
@ggershinsky
I think in write/read, if COLUMN_ID_RESOLUTION sets to true but field_id were not set by the caller/writer, we need to throw Exception.
@shangxinli Yes, I will join the meeting tomorrow.
I'll join too.
hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)? I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).
+1, I think we can add it to the design doc
@rdblue regarding to 'using Iceberg expressions and filters', we agreed to use. We are finding resources to work on it. Huaxin may be able to work on it after this column resolution by ID is done (didn't confirm with her yet).
@huaxingao @shangxinli Hi,really appreciate the hard work has been done so far. how is this PR going now? Any updates?
@iflytek-hmwang5 I think we don't need this PR any more. The purpose of this PR is to introduce page indexing in Iceberg. However, there's an alternative method for supporting page indexes in Iceberg, as outlined in this PR