velox icon indicating copy to clipboard operation
velox copied to clipboard

Fix the issue that the splits cannot be skipped for some special case

Open yingsu00 opened this issue 10 months ago • 2 comments

In some rare case, the tables created by other engines would put the partition key columns in the data file too, and fail to write ColumnStats for such columns. In such case, the split from a partition with NULL partition keys failed to be skipped, causing wrong results. This is because HiveConnectorUtils::testFilters() assumes the partition keys are not in the data file, therefore is unable to apply the filter on the partition key. This commit fixes this issue by also checking the partition key list even when the partition columns are in the data file.

yingsu00 avatar Mar 27 '24 11:03 yingsu00

Deploy Preview for meta-velox canceled.

Name Link
Latest commit c741fc3b07d77e8643754a4b7f8ac08f307b99d0
Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/662b1913e9b0d50008d35ccf

netlify[bot] avatar Mar 27 '24 11:03 netlify[bot]

Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?

Yuhta avatar Apr 03 '24 17:04 Yuhta

Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?

@Yuhta Iceberg writes in this way in both Presto and Spark.

ethanyzhang avatar Apr 04 '24 16:04 ethanyzhang

@yzhang1991 That's unfortunate. We should have comment in code that we are doing this for unfixable bug in Iceberg

Yuhta avatar Apr 04 '24 17:04 Yuhta

Having partition key in file is a wrong thing to starts with. Where and when is the error introduced and how easy is it to fix it on the write side?

@Yuhta For some table formats like Iceberg, the partition management is different than Hive tables. Iceberg uses "partition spec" in manifest files and does not exclude the column in the data file. It was a legitimate Iceberg table, just missing the stats.

One thing I can do is to move testFilters from the utility class to SplitReader, and override it in IcebergSplitReader. But the code would be mostly the same. How do you think of this?

yingsu00 avatar Apr 07 '24 06:04 yingsu00

We should have comment in code that we are doing this for unfixable bug in Iceberg

This is not a bug of Iceberg but by design. Here is the Iceberg spec about partitioning: https://iceberg.apache.org/spec/#partitioning

Partition values for a data file must be the same for all records stored in the data file. 
Tables are configured with a partition spec that defines how to produce a tuple of partition values from a record. A partition spec has a list of fields that consist of:

A source column id or a list of source column ids from the table’s schema
A partition field id that is used to identify a partition field and is unique within a partition spec. In v2 table metadata, it is unique across all partition specs.
A transform that is applied to the source column(s) to produce a partition value
A partition name
The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct.

The partition spec is stored in manifest files, and a manifest file can point to any data file in any partition. This design allows easy partition transform and evolution. This is different from Hive tables, where directories are used to manage the data files.

yingsu00 avatar Apr 08 '24 06:04 yingsu00

@Yuhta I checked my code and see comments were already added:

// The partition key columns are written in the data file for
      // IcebergTables, so we need to test both cases
      if (!rowType->containsChild(name) || iter != partitionKey.end()) {
        if (iter != partitionKey.end() && iter->second.has_value()) {
          // This is a non-null partition key
          return applyPartitionFilter(
              (*partitionKeysHandle)[name]->dataType()->kind(),
              iter->second.value(),
              child->filter());
        }
        // Column is missing, most likely due to schema evolution. Or it's a
        // partition key but the partition value is NULL.
        if (child->filter()->isDeterministic() &&
            !child->filter()->testNull()) {
          VLOG(1) << "Skipping " << filePath
                  << " because the filter testNull() failed for column "
                  << child->fieldName();
          return false;
        }

Do you think we need to add more comments?

yingsu00 avatar Apr 08 '24 06:04 yingsu00

@yingsu00 The comment is good now. Can you address the ORC stats issue I raised? Missing stats does not mean the 0 column stats, it means each one of them is populated with some default value that will pass all filters.

Yuhta avatar Apr 10 '24 16:04 Yuhta

@yingsu00 Curious is there a use case in Iceberg that we read only the data file but not the manifest? If no such case then the partition column in data file is redundant and is a design bug

Yuhta avatar Apr 10 '24 16:04 Yuhta

Curious is there a use case in Iceberg that we read only the data file but not the manifest? If no such case then the partition column in data file is redundant and is a design bug

@Yuhta No there is no such case. The Iceberg tables write the partition keys in the data file and get the partition spec from the manifest file (or manifest file cache) in the scan planning phase. This way the Iceberg tables can change partition key easily over time, and support partition transform easily. E.g. A table can be partitioned by year(ds) at time 1, then changed to be partitioned by day(ds) at time 2, then changed to partitioned by another column e.g. supp_key at time 3.

yingsu00 avatar Apr 18 '24 01:04 yingsu00

@yingsu00 I see, thanks. Can you update the comment to add that this is designed to implement partition column change in table format?

Yuhta avatar Apr 18 '24 17:04 Yuhta

@yingsu00 I see, thanks. Can you update the comment to add that this is designed to implement partition column change in table format?

Sure. Updated the comment to be the following:

      // By design, the partition key columns for Iceberg tables are included in
      // the data files to facilitate partition transform and partition
      // evolution, so we need to test both cases.

yingsu00 avatar Apr 20 '24 00:04 yingsu00

@Yuhta Hi Jimmy, I addressed your other comments, can you please review again? Thank you!

yingsu00 avatar Apr 20 '24 01:04 yingsu00

@Yuhta Hi Jimmy, as you suggested, I created another PR https://github.com/facebookincubator/velox/pull/9707 that does not modify the ColumnStats writing, but with a test in ParquetTableScanTest.cpp using a Parquet file from a null partition in an Iceberg table. Please let me know which one you prefer and I'll close the other one. Thank you!

yingsu00 avatar May 08 '24 01:05 yingsu00

@yingsu00 https://github.com/facebookincubator/velox/pull/9707 looks good and I accepted it, it will be merged soon

Yuhta avatar May 09 '24 15:05 Yuhta