Parquet: Add page filter using page indexes
This adds an evaluator, ParquetIndexPageFilter.EvalVisitor to evaluate an Iceberg Expression against Parquet's page indexes. That produces Parquet's RowRanges, which track ranges of rows that should be read from the Parquet file. The filter class, ParquetIndexPageFilter also sets the row ranges on the ParquetFileReader.
Parquet doesn't expose some of the RowRanges methods publicly, so this has a ParquetRanges class that uses reflection to construct the ranges and to use them to filter ParquetFileReader.
Also, this does not update record materialization. Record materialization will need to coordinate the values read across pages. Currently, it assumes that the pages returned are in sync, but that will change when Parquet filters pages.
For example, if column A has just one page, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] and column B has two pages, [a, b, c, d, e] and [f, g, h, i, j, k], and the row range is 6-8 then the results should be (6, g), (7, h), and (8, i). The reader for column A will get the entire page and need to skip 6 values (0, 1, 2, 3, 4, and 5), then the reader for column B will get just the second page and need to skip 1 value (f) to get in sync. The readers will need to handle this.
I did this work a while ago and after looking at it with fresh eyes today, I have a couple concerns:
- I didn't get to writing tests for the
RowRangesthat are produced. This should be done next. - The filter is applied at the row group level, but it looks like that's not how
RowRangesworks in Parquet, so this probably needs to be refactored to run the filter, produceRowRanges, and set them on theParquetFileReaderjust once. In addition, if any row groups are completely eliminated by the filter, this will need to account for the missing row groups in the Iceberg Parquet reader. - Since the filter is applied at the row group level,
allRowsmay be incorrect (it is the row group's row count) and need to be adjusted to be the entire file rather than specific to a row group.
Thanks for your sharing!
The filter is applied at the row group level, but it looks like that's not how RowRanges works in Parquet, so this probably needs to be refactored to run the filter, produce RowRanges, and set them on the ParquetFileReader just once.
I think RowRanges is also at the row group level, Parquet-mr will generate a RowRanges for each row-group when running the column index filter. So I think the allRows is correct because the row index stored in the page index is also its position in the row-group.
I have a convern is that currently we can't read filtered row-group even after we set RowRanges in ParquetFileReader. The readFilteredRowGroup method provided by Parquet will detect whether there is a filter pushed down,
and only return the filtered row-group when there is a push-down filter.
So I think we maybe can add a method in ParquetFileReader that allows users to specify RowRanges by themselves, after all Iceberg will calculate these RowRanges by itself:
public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) throws IOException {
@rdblue WDYT? In addition, I think we can also let Parquet expose some methods to make it easier for Iceberg to implement column index filter. We will have to wait for the next version to use, but Parquet-mr may have a release in a month, see this comment , we should be able to catch up with this release. If you agree, I can open a PR in the Parquet-mr repo. I think we could use these changes in Parquet. Here is my implementation based on these Parquet APIs if you‘d like to review it :)
The readFilteredRowGroup method provided by Parquet will detect whether there is a filter pushed down, and only return the filtered row-group when there is a push-down filter.
I commented where we set the row ranges for the row group. I think that should work with Parquet, but it's been a while since I looked at it. Getting a public API call in would make it easier.
I think RowRanges is also at the row group level, Parquet-mr will generate a RowRanges for each row-group when running the column index filter.
Great! I put this together by trying to reverse engineer what was going on in Parquet, so I must have gotten it right.
We will have to wait for the next version to use, but Parquet-mr may have a release in a month, see this comment , we should be able to catch up with this release. If you agree, I can open a PR in the Parquet-mr repo.
I'm all for adding what we need to Parquet. We can continue to use reflection until it is available.
I think the main thing is that we don't currently handle the row ranges after we've skipped reading the pages. So the next steps are to verify what's in this PR and then to update the read paths so that values are skipped for the skipped rows.
I'm all for adding what we need to Parquet. We can continue to use reflection until it is available.
I think the main thing is that we don't currently handle the row ranges after we've skipped reading the pages. So the next steps are to verify what's in this PR and then to update the read paths so that values are skipped for the skipped rows.
Got it!
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.