iceberg
iceberg copied to clipboard
Parquet: Implement column index filter and update row read path to support page skipping
This PR mainly does two things to support page filtering in the Parquet row read path:
-
This implements a
ParquetColumnIndexFilterwhich is equivalent to the Parquet-mr'sColumnIndexFilter, but based on Iceberg's expressions. It uses reflection to call thegetColumnIndexmethod in theParquetFileReaderto get the column index and passes it to theColumnIndexEvalVisitorforRowRangescalculation. When filtering pages, it uses anIntPredicatewhich takes a page index as an argument and evaluates whether the corresponding page can match the expression, using page stats stored in the column index. Then geta a list of possible matching pages and getaRowRangesbased on them. This mocks column index stats for testing in theTestColumnIndexFilter, which follows the test method in the Parquet-mr project. -
This also updates the Parquet row read path to support page skipping, it calculates
RowRangesfor row groups and makesParquetReaderto read the filtered page store (by using reflection) if theRowRangesis not empty and the row count ofRowRanges< the row count inBlockMetadata. The rows in the filtered page store may be not aligned between pages of the different columns, so this updatesColumnIteratorto skip values which does not belong the target rows represented byRowRanges. In the page filtering scenario, the row reader will make its all column reader in sync before reading a new row. Specifically, the calculatedRowRanges(target row indexes) will be passed to eachColumnIterator, and thefirstRowIndexwill be retrieved from pages for locating the initial value of current row index after reading a new page. This adds atopLevelfield inorg.apache.iceberg.parquet.ParquetValueReaders.StructReaderto mark whether it is a row reader or nested struct reader(only adds to SparkInternalRowReader), before the rowStructReaderreads a new row, its all child columns will skip un-target triples to sync. To achieve that, this addscurrentRowIndexandskipValuesinColumnIteratorto mark the state of synchronization, theskipValuesrepresents the number of non-null values to skip. During the synchronization, thecurrentRowIndexwill be incremented by 1 whenever rep level is 0 and theskipValueswill be incremented by 1 whenever def level is is bigger than thedefinitionLevel, synchronization will advance theColumnIteratoruntil thecurrentRowIndexis no longer less thantargetRowIndex, and then callPageIterator#skipto skip values in the page. After the synchronization, all non-target triples before the next target row will be skipped, and the new row can be safely read.
The batch read path will be updated in a subsequent PR. Coauthored by @rdblue
@rdblue @Fokko Can you help to review this?
@zhongyujiang can you please add more to the description about what is included here and how you solved the problems with record materialization?
@rdblue The description has been updated, I hope I made it clear, please let me know if something is unclear.
How's it going?
@zhongyujiang @rdblue any recent progress on this PR? it would a very useful feature!
Hi @hengqujushi @sunchao, I rebased this and fixed the revapi failure, I think this should be ready for review now. Though I feel this PR is kind of big, maybe we should break this into smaller PRs for easier review? I think we can separate the column index filter part and record materialization on row read path part. cc @rdblue What do you think?
@zhongyujiang Hi, really appreciate the hard work has been done so far. It's a very useful feature! any update on this PR? Is there any work left to be done?
Hi @iflytek-hmwang5 , We're waiting for people in the community who are interested in it to review it.
Hi @iflytek-hmwang5 , We're waiting for people in the community who are interested in it to review it. @zhongyujiang Hi, what can do to speed up the process? Looking forward to this feature very much.
@zhongyujiang Hi, what can do to speed up the process? Looking forward to this feature very much.
Um, I'm not sure, I think it depends on the community's priorities, and reviewers' bandwidth.
@zhongyujiang Maybe you can refer to this PR, https://github.com/apache/iceberg/issues/193, Anton Okolnychyi mentioned this idea before, maybe he'll be interested in it.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.