GH-43994: [C++][Parquet] Fix schema conversion from two-level encoding nested list
Rationale for this change
The current C++ parquet implementation interprets following parquet schema as `array<struct<array:array
optional group a (LIST) {
repeated group array (LIST) {
repeated int32 array;
}
}
What changes are included in this PR?
According to the parquet spec, the above schema should be inferred as array<array<int>>.
Are these changes tested?
Yes, a test case has been added to verify the fix.
Are there any user-facing changes?
No.
- GitHub Issue: #43994
@emkornfield @pitrou @mapleFU Would you mind taking a look? Thanks!
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
Without legacy:
The element field encodes the list's element type and repetition. Element repetition must be required or optional.
With backward capability:
Some existing data does not include the inner element layer. For backward-compatibility, the type of elements in LIST-annotated structures should always be determined by the following rules:
- If the repeated field is not a group, then its type is the element type and elements are required.
- If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
- If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
- Otherwise, the repeated field's type is the element type with the repeated field's repetition.
So, seems this hit the (1)?
Parquet schema is too tricky for me, I'd try to take a look at https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java#L220 tomorrow...
I've check Java related code: https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L145
I'll dive into it this after noon
optional group a (LIST) {
repeated group array (LIST) {
repeated int32 array;
}
}
IMO, the root cause is that the current code recognizes the schema above as a three-level encoding. However, the inner-most field can only be required or optional in three-level encoding, but here the int32 field is repeated. We can decouple the nesting field into two lists as below:
outer_list:
optional group a (LIST) {
repeated group array (LIST) {}
}
inner_list:
repeated group array (LIST) {
repeated int32 array;
}
It is obvious that inner_list can simply apply backward-compatibility rule (1). For the outer_list, the current code applies rule (3). I think we need to apply rule (4) here by modifying the rule (3) to below:
If the repeated field is a group with one required or optional field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
Yes. It's so tricky, I think we can just copying the Java code directly, lol
Our
ListToSchemaFieldis like this part of the code https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L397-L421Should we port the impl and testings in that?
I think we are just missing check of this line: https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L588
What about this line: https://github.com/apache/parquet-java/blob/aec7bc64dffa373db678ab2fc8b46565b4c011a5/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L587 ?
I think we are just missing check of this line
This fixing itself LGTM, but I just think should we test and align more...
The current C++ parquet implementation interprets following parquet schema as `array<structarray:array>, which is wrong:
What is "array"? Do you mean "list"? Can you fix the PR description?
According to the parquet spec, the above schema should be inferred as
array<array<int>>.
Where is this in the Parquet spec? I cannot find a similar example.
I have seen an issue when reading a Parquet file created by Hudi.
- Can we check with the Parquet ML whether this is really a legitimate schema structure?
- If so, can we add a testing file in parquet-testing?
Where is this in the Parquet spec? I cannot find a similar example.
The wording of the spec is very ambigious:
If the repeated field is not a group, then its type is the element type and elements are required. If the repeated field is a group with multiple fields, then its type is the element type and elements are required. If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required. Otherwise, the repeated field's type is the element type with the repeated field's repetition.
I think this just following the rule(4): repeated field's type is the element type with the repeated field's repetition.
Can we check with the Parquet ML whether this is really a legitimate schema structure? If so, can we add a testing file in parquet-testing?
I think maybe a testfile would be better
I‘m using Hive schema, so that's why it is array<array<int>>. The file could be easily produced by Spark Sql like below:
package org.example
import org.apache.spark.sql.SparkSession
object ParquetTwoLevelList {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local[1]")
.appName("NestedListTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.getOrCreate()
spark.sql("CREATE TABLE nested_list_test (a array<array<int>>) USING HUDI")
spark.sql("INSERT INTO nested_list_test VALUES ( array(array(1,2), array(3,4)) )")
}
}
The parquet-cli prints the following metadata:
File path: /Users/gangwu/Projects/hudi-spark-generator/spark-warehouse/nested_list_test/f92ed4b5-c063-4b94-90a4-5ef997db1a6c-0_0-13-12_20240911093900996.parquet
Created by: parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)
Properties:
hoodie_bloom_filter_type_code: DYNAMIC_V0
org.apache.hudi.bloomfilter: ***
hoodie_min_record_key: 20240911093900996_0_0
parquet.avro.schema: {"type":"record","name":"nested_list_test_record","namespace":"hoodie.nested_list_test","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"a","type":["null",{"type":"array","items":["null",{"type":"array","items":["null","int"]}]}],"default":null}]}
writer.model.name: avro
hoodie_max_record_key: 20240911093900996_0_0
Schema:
message hoodie.nested_list_test.nested_list_test_record {
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
optional group a (LIST) {
repeated group array (LIST) {
repeated int32 array;
}
}
}
Row group 0: count: 1 441.00 B records start: 4 total(compressed): 441 B total(uncompressed):349 B
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
_hoodie_commit_time BINARY G _ 1 68.00 B 0 "20240911093900996" / "20240911093900996"
_hoodie_commit_seqno BINARY G _ 1 72.00 B 0 "20240911093900996_0_0" / "20240911093900996_0_0"
_hoodie_record_key BINARY G _ 1 72.00 B 0 "20240911093900996_0_0" / "20240911093900996_0_0"
_hoodie_partition_path BINARY G _ 1 50.00 B 0 "" / ""
_hoodie_file_name BINARY G _ 1 116.00 B 0 "f92ed4b5-c063-4b94-90a4-5..." / "f92ed4b5-c063-4b94-90a4-5..."
a.array.array INT32 G _ 4 15.75 B 0 "1" / "4"
-------------
@wgtmac Would you mind check testing file and add one if not exists in parquet-testing?
I will try to use parquet-java to create a minimal file and add it to parquet-testing. The file created by Hudi is too large due to a file-level bloom filter embedded in the file footer.
Gentle ping :) @emkornfield @pitrou @mapleFU
@emkornfield Thanks for your review!
I've rebased it and the test failure in R / rhub/ubuntu-gcc12:latest is unrelated (observed the same error from other PRs). I'll merge it.
After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit a6fe59592c643d591ee719041dd2b5d8b9ada7d6.
There were 132 benchmark results with an error:
- Commit Run on
arm64-t4g-2xlarge-linuxat 2024-11-25 07:38:33Z - and 130 more (see the report linked below)
There were no benchmark performance regressions. 🎉
The full Conbench report has more details. It also includes information about 4 possible false positives for unstable benchmarks that are known to sometimes produce them.