spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39833][SQL] Remove partition columns from data schema in the case of overlapping columns to fix Parquet DSv1 incorrect count issue

Open sadikovi opened this issue 3 years ago • 5 comments

What changes were proposed in this pull request?

This PR updates schema inference in DSv1 FileFormat to remove overlapping columns from the data schema and keep them in the partition schema. This results in the following:

  • consistent behaviour with DSv2,
  • consistent behaviour between user-provided schema and inferred schema in DSv1,
  • fixes a correctness issue in a corner case when data schema == partition schema.

For a table like this one

root/
  col0=0/
    part-0001.parquet (schema: COL0, 1 record {"COL0": 0})

when projection overlaps with partition columns, the output schema (requiredSchema) becomes empty. In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty, any checks on columns would fail and 0 rows are returned (RowRanges.EMPTY) even though there is data matching the filter.

This case is rare and only happens when doing count on a DataFrame that results in empty projection (most of the cases would include the filtered column which would work) but it is still good to fix.

This is rather a mitigation, the actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in FileScanBuilder::readDataSchema().

Why are the changes needed?

Fixes a rare correctness issue when running count on a filtered Parquet DataFrame when using Parquet DSv1. As a side-effect, makes behaviour consistent between DSv1 and DSv2.

Downsides: performance regression in the case of an empty requiredSchema as we will not push down the filter anymore. However, predicate pushdown is still applied correctly when requiredSchema is non-empty.

Does this PR introduce any user-facing change?

Yes. Affects all file-based data sources that have overlapping columns in data schema and partition schema.

Schema for a Parquet (or any file based datasource) with overlapping columns will return those columns appended to the file schema.

* data schema partition schema output
Before i, p, j p i, p, j
After i, p, j p i, j, p

How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

sadikovi avatar Aug 05 '22 05:08 sadikovi

@sadikovi in the example you gave:

root/
  col0=0/
    part-0001.parquet (schema: COL0)

what's the content in part-0001.parquet? I wonder why we need to pushdown partition filters to Parquet, given that we'll not materialize the partition values in the Parquet files. What is the pushed filters to Parquet in this example?

sunchao avatar Aug 05 '22 18:08 sunchao

Can one of the admins verify this patch?

AmplabJenkins avatar Aug 05 '22 23:08 AmplabJenkins

The content is just one column COL with one value 0. You would still want to filter out row groups that don't much the predicate otherwise it could be a performance regression. The filters are evaluated in the example, see https://issues.apache.org/jira/browse/PARQUET-2170.

sadikovi avatar Aug 07 '22 22:08 sadikovi

To be honest, I am still thinking about the best way to mitigate the problem at the moment. I decided to fix it in a similar way as what DSv2 does.

sadikovi avatar Aug 07 '22 23:08 sadikovi

@cloud-fan Can you review? This change modifies a test that was added in SPARK-22356 that you authored. Thanks.

sadikovi avatar Aug 08 '22 06:08 sadikovi

@sunchao @cloud-fan Would you be able to take another look?

I have kept the original "patch". It is essentially a band aid until the Parquet ticket is fixed. I cannot think of a better and less intrusive way to fix the problem. Let me know if you have any questions about it, I will be happy to clarify. Thanks.

sadikovi avatar Aug 11 '22 06:08 sadikovi

Not exactly, the filter actually references columns that exist in the file. It is the projection that matters in the code apparently.

Here is what they have in the javadoc:

   * @param paths
   *          the paths of the columns used in the actual projection; a column not being part of the projection will be
   *          handled as containing {@code null} values only even if the column has values written in the file

https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)

I am not very familiar with the implementation but I think the library should be returning all rows instead of empty rows.

sadikovi avatar Aug 12 '22 06:08 sadikovi

@sadikovi which spark version starts to have this bug?

cloud-fan avatar Aug 12 '22 06:08 cloud-fan

Oh, this was originally implemented in PARQUET-1201 which is parquet-mr 1.11.0. Again, this is a rare case maybe we don't do anything about it or merge in master only.

I don't know if there is a better way to fix the problem other than fixing it in parquet-mr.

sadikovi avatar Aug 12 '22 06:08 sadikovi

Apologies, I did not have time to debug this yet. I will do that tomorrow.

sadikovi avatar Aug 18 '22 06:08 sadikovi

I decided to disable column index altogether until I have a better fix or parquet bug is fixed. I also moved tests to ParquetQueryV1 as one of the tests fails in DSv2 due to another bug in projection.

@cloud-fan @sunchao Can you review this PR? I just think adding a check on required schema and column filters could be error-prone especially when nested fields are involved. It seems to me it is easier to disable column index by default which can still be enabled manually by users.

I am also open to other suggestions.

sadikovi avatar Aug 18 '22 07:08 sadikovi

I suggest we merge the PR with this fix and I will follow up on a more permanent resolution, maybe fix it in Parquet-mr. I am also thinking that we may need to backport it to 3.3 although this would be up to committers.

sadikovi avatar Aug 18 '22 07:08 sadikovi

I think it's fine to disable it temporarily, but I'd prefer a fix in Spark itself though so that we can backport it to 3.3 without relying on a Parquet release and bumping the version there. I can also take a look on the approach of checking filters against required schema.

Could you open a JIRA tracking the permanent fix in Spark, and mark it as blocker for 3.4.0 release?

sunchao avatar Aug 18 '22 16:08 sunchao

I will take a look on how to fix it in Spark, I have not had enough time to work on this problem yet. I would like to mention that the fix should be merged into 3.3 and 3.4 and potentially earlier releases where the bug occurs as it is a correctness issue, it is also unclear if it partition columns related at this point, it could be potentially reproducible with a simple predicate pushdown and projection.

sadikovi avatar Aug 18 '22 22:08 sadikovi

@sunchao Can you help me to find a workaround in Spark for this if we want to fix it as part of this PR? Otherwise, I can take a look in a follow-up. Thanks.

sadikovi avatar Aug 19 '22 01:08 sadikovi

@sadikovi yes, I can also take a look at this next week. I'm fine either way: what do you think @cloud-fan @HyukjinKwon , should we merge this PR as it is (via disabling column index) first, and work on a fix separately?

sunchao avatar Aug 19 '22 20:08 sunchao

Yeah, let's just get this in first.

HyukjinKwon avatar Aug 21 '22 09:08 HyukjinKwon

Merged to master, branch-3.3, and branch-3.2.

HyukjinKwon avatar Aug 21 '22 09:08 HyukjinKwon

Thank you for merging the PR. I have opened the follow-up ticket https://issues.apache.org/jira/browse/SPARK-40169 to fix this properly. I will sync with @sunchao separately on this, I am sure we will be able to come up with a proper way to fix it!

sadikovi avatar Aug 21 '22 22:08 sadikovi