parquet-java icon indicating copy to clipboard operation
parquet-java copied to clipboard

PARQUET-2006: Column resolution by ID

Open huaxingao opened this issue 3 years ago • 13 comments

Co-Authored-By: Huaxin Gao [email protected] Co-Authored-By: Xinli Shang [email protected]

Jira

  • [ ] My PR addresses the following Parquet Jira issues and references them in the PR title. For example, "PARQUET-1234: My Parquet PR"
    • https://issues.apache.org/jira/browse/PARQUET-2006
    • In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.

Changes in this PR:

Parquet relies on the column name. In a lot of usages e.g. schema resolution, this would be a problem. For example, Iceberg has schema evolution and column name could be changed.

This PR will add the support of column resolution by Id. The Id we are using is the field_id in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398). After this PR, if the user explicitly requests for column Id resolution, Parquet readers will use the field_id to determine which Parquet columns to read.

The changes are as follows:

In write path, ParquetOutputFormat.COLUMN_ID_RESOLUTION is introduced, the default is false. If sets to true, the field_id has to be unique in the entire schema, otherwise, an Exception will be thrown. In read path, ParquetInputFormat.COLUMN_ID_RESOLUTION is introduced and the default is false. If sets to true, Parquet readers will resolve the column using field_id. If there are duplicate id in the schema, an Exception will be thrown.

For Filters, this PR adds the choice of constructing a filter using column ID instead of column path for example, originally we only allow

IntColumn intColumn = intColumn("a.b.c"); 
FilterPredicate predicate = eq(intColumn, 7);

Now we can have

IntColumn intColumn = intColumn(new Type.ID(1)); 
FilterPredicate predicate = eq(intColumn, 7);

For Filters, the column resolution is done at SchemaCompatibilityValidator.validateColumn. In this method, the file's schema is used as the source of truth to resolve and validate the filter's columns.

For read schema, the column resolution is done at ParquetFileReader.setRequestedSchema. Again, the file's schema is used as the source of truth to resolve and validate the requested schema. If ParquetInputFormat.COLUMN_ID_RESOLUTION sets to true, the id in the requested schema will be used to figured out the original ColumnPath.

For example, if the file schema has column name random, type int, id 1 , column name name, type String, id 2 as follows (type is Spark type)

      val writeSchema =
        new StructType()
          .add("random", IntegerType, true, withId(1))
          .add("name", StringType, true, withId(0))

if after schema evolution, the table schema is changed to column name a, type int, id 1 , column name b, type String, id 2 as follows (type is Spark type)

      val readSchema =
        new StructType()
          .add("a", StringType, true, withId(0))
          .add("b", IntegerType, true, withId(1))

the id in the requested schema (the above readSchema) will be used to compared to the file schema to figure out the correct ColumnPath to read.

Tests

  • [ ] My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • [ ] My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • [ ] In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

huaxingao avatar Mar 03 '22 23:03 huaxingao

Can you rebase and combine all the commits?

shangxinli avatar Mar 04 '22 17:03 shangxinli

Rebased. @shangxinli

huaxingao avatar Mar 05 '22 06:03 huaxingao

cc @ggershinsky @sunchao Could you please take a look? Thanks!

huaxingao avatar Mar 08 '22 18:03 huaxingao

Hi. @huaxingao Thanks for working on it. I just had a first-round review and left some comments. After we address them, I will have another look.

shangxinli avatar Mar 19 '22 20:03 shangxinli

hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)? I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).

ggershinsky avatar Mar 21 '22 07:03 ggershinsky

@ggershinsky I updated the description. Please check again to see if it is clear to you. Thanks!

huaxingao avatar Mar 22 '22 01:03 huaxingao

Thanks @huaxingao , one more question / clarification. In the writer,

field_id has to be unique in the entire schema, otherwise, an Exception will be thrown.

what happens if the column ids are not set by the caller?

In the reader,

the id in the requested schema will be used to figured out the original ColumnPath.

what happens if the file footer doesn't have the column ids (field_id were not set by the writer)?

ggershinsky avatar Mar 22 '22 07:03 ggershinsky

@huaxingao @ggershinsky Will you be able to join tomorrow's meeting to have a discussion on the open issues? We can try to close tham in the meeting and move this PR forward.

shangxinli avatar Mar 22 '22 21:03 shangxinli

@ggershinsky I think in write/read, if COLUMN_ID_RESOLUTION sets to true but field_id were not set by the caller/writer, we need to throw Exception.

huaxingao avatar Mar 23 '22 03:03 huaxingao

@shangxinli Yes, I will join the meeting tomorrow.

huaxingao avatar Mar 23 '22 03:03 huaxingao

I'll join too.

ggershinsky avatar Mar 23 '22 05:03 ggershinsky

hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)? I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).

+1, I think we can add it to the design doc

shangxinli avatar Mar 23 '22 15:03 shangxinli

@rdblue regarding to 'using Iceberg expressions and filters', we agreed to use. We are finding resources to work on it. Huaxin may be able to work on it after this column resolution by ID is done (didn't confirm with her yet).

shangxinli avatar Apr 20 '22 15:04 shangxinli

@huaxingao @shangxinli Hi,really appreciate the hard work has been done so far. how is this PR going now? Any updates?

iflytek-hmwang5 avatar Mar 24 '24 13:03 iflytek-hmwang5

@iflytek-hmwang5 I think we don't need this PR any more. The purpose of this PR is to introduce page indexing in Iceberg. However, there's an alternative method for supporting page indexes in Iceberg, as outlined in this PR

huaxingao avatar Mar 25 '24 04:03 huaxingao