velox
velox copied to clipboard
Fix the issue that the splits cannot be skipped for some special case
In some rare case, the tables created by other engines would put the partition key columns in the data file too, and fail to write ColumnStats for such columns. In such case, the split from a partition with NULL partition keys failed to be skipped, causing wrong results. This is because HiveConnectorUtils::testFilters() assumes the partition keys are not in the data file, therefore is unable to apply the filter on the partition key. This commit fixes this issue by also checking the partition key list even when the partition columns are in the data file.
Deploy Preview for meta-velox canceled.
Name | Link |
---|---|
Latest commit | c741fc3b07d77e8643754a4b7f8ac08f307b99d0 |
Latest deploy log | https://app.netlify.com/sites/meta-velox/deploys/662b1913e9b0d50008d35ccf |
Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?
Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?
@Yuhta Iceberg writes in this way in both Presto and Spark.
@yzhang1991 That's unfortunate. We should have comment in code that we are doing this for unfixable bug in Iceberg
Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?
@Yuhta For some table formats like Iceberg, the partition management is different than Hive tables. Iceberg uses "partition spec" in manifest files and does not exclude the column in the data file. It was a legitimate Iceberg table, just missing the stats.
One thing I can do is to move testFilters
from the utility class to SplitReader, and override it in IcebergSplitReader. But the code would be mostly the same. How do you think of this?
We should have comment in code that we are doing this for unfixable bug in Iceberg
This is not a bug of Iceberg but by design. Here is the Iceberg spec about partitioning: https://iceberg.apache.org/spec/#partitioning
Partition values for a data file must be the same for all records stored in the data file.
Tables are configured with a partition spec that defines how to produce a tuple of partition values from a record. A partition spec has a list of fields that consist of:
A source column id or a list of source column ids from the table’s schema
A partition field id that is used to identify a partition field and is unique within a partition spec. In v2 table metadata, it is unique across all partition specs.
A transform that is applied to the source column(s) to produce a partition value
A partition name
The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct.
The partition spec is stored in manifest files, and a manifest file can point to any data file in any partition. This design allows easy partition transform and evolution. This is different from Hive tables, where directories are used to manage the data files.
@Yuhta I checked my code and see comments were already added:
// The partition key columns are written in the data file for
// IcebergTables, so we need to test both cases
if (!rowType->containsChild(name) || iter != partitionKey.end()) {
if (iter != partitionKey.end() && iter->second.has_value()) {
// This is a non-null partition key
return applyPartitionFilter(
(*partitionKeysHandle)[name]->dataType()->kind(),
iter->second.value(),
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
// partition key but the partition value is NULL.
if (child->filter()->isDeterministic() &&
!child->filter()->testNull()) {
VLOG(1) << "Skipping " << filePath
<< " because the filter testNull() failed for column "
<< child->fieldName();
return false;
}
Do you think we need to add more comments?
@yingsu00 The comment is good now. Can you address the ORC stats issue I raised? Missing stats does not mean the 0 column stats, it means each one of them is populated with some default value that will pass all filters.
@yingsu00 Curious is there a use case in Iceberg that we read only the data file but not the manifest? If no such case then the partition column in data file is redundant and is a design bug
Curious is there a use case in Iceberg that we read only the data file but not the manifest? If no such case then the partition column in data file is redundant and is a design bug
@Yuhta No there is no such case. The Iceberg tables write the partition keys in the data file and get the partition spec from the manifest file (or manifest file cache) in the scan planning phase. This way the Iceberg tables can change partition key easily over time, and support partition transform easily. E.g. A table can be partitioned by year(ds) at time 1, then changed to be partitioned by day(ds) at time 2, then changed to partitioned by another column e.g. supp_key at time 3.
@yingsu00 I see, thanks. Can you update the comment to add that this is designed to implement partition column change in table format?
@yingsu00 I see, thanks. Can you update the comment to add that this is designed to implement partition column change in table format?
Sure. Updated the comment to be the following:
// By design, the partition key columns for Iceberg tables are included in
// the data files to facilitate partition transform and partition
// evolution, so we need to test both cases.
@Yuhta Hi Jimmy, I addressed your other comments, can you please review again? Thank you!
@Yuhta Hi Jimmy, as you suggested, I created another PR https://github.com/facebookincubator/velox/pull/9707 that does not modify the ColumnStats writing, but with a test in ParquetTableScanTest.cpp using a Parquet file from a null partition in an Iceberg table. Please let me know which one you prefer and I'll close the other one. Thank you!
@yingsu00 https://github.com/facebookincubator/velox/pull/9707 looks good and I accepted it, it will be merged soon