iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Parquet: Implement column index filter and update row read path to support page skipping

Open zhongyujiang opened this issue 2 years ago • 12 comments

This PR mainly does two things to support page filtering in the Parquet row read path:

  1. This implements a ParquetColumnIndexFilter which is equivalent to the Parquet-mr's ColumnIndexFilter, but based on Iceberg's expressions. It uses reflection to call the getColumnIndex method in the ParquetFileReader to get the column index and passes it to the ColumnIndexEvalVisitor for RowRanges calculation. When filtering pages, it uses an IntPredicate which takes a page index as an argument and evaluates whether the corresponding page can match the expression, using page stats stored in the column index. Then geta a list of possible matching pages and geta RowRanges based on them. This mocks column index stats for testing in the TestColumnIndexFilter, which follows the test method in the Parquet-mr project.

  2. This also updates the Parquet row read path to support page skipping, it calculates RowRanges for row groups and makesParquetReader to read the filtered page store (by using reflection) if the RowRanges is not empty and the row count of RowRanges < the row count in BlockMetadata. The rows in the filtered page store may be not aligned between pages of the different columns, so this updates ColumnIterator to skip values which does not belong the target rows represented by RowRanges. In the page filtering scenario, the row reader will make its all column reader in sync before reading a new row. Specifically, the calculated RowRanges (target row indexes) will be passed to each ColumnIterator, and the firstRowIndex will be retrieved from pages for locating the initial value of current row index after reading a new page. This adds a topLevel field in org.apache.iceberg.parquet.ParquetValueReaders.StructReader to mark whether it is a row reader or nested struct reader(only adds to Spark InternalRowReader), before the row StructReader reads a new row, its all child columns will skip un-target triples to sync. To achieve that, this adds currentRowIndex and skipValues in ColumnIterator to mark the state of synchronization, the skipValues represents the number of non-null values to skip. During the synchronization, the currentRowIndex will be incremented by 1 whenever rep level is 0 and the skipValues will be incremented by 1 whenever def level is is bigger than the definitionLevel, synchronization will advance the ColumnIterator until the currentRowIndex is no longer less than targetRowIndex, and then call PageIterator#skip to skip values in the page. After the synchronization, all non-target triples before the next target row will be skipped, and the new row can be safely read.

The batch read path will be updated in a subsequent PR. Coauthored by @rdblue

zhongyujiang avatar Mar 01 '23 07:03 zhongyujiang

@rdblue @Fokko Can you help to review this?

zhongyujiang avatar Mar 01 '23 07:03 zhongyujiang

@zhongyujiang can you please add more to the description about what is included here and how you solved the problems with record materialization?

rdblue avatar Mar 03 '23 22:03 rdblue

@rdblue The description has been updated, I hope I made it clear, please let me know if something is unclear.

zhongyujiang avatar Mar 06 '23 13:03 zhongyujiang

How's it going?

hengqujushi avatar Jun 16 '23 09:06 hengqujushi

@zhongyujiang @rdblue any recent progress on this PR? it would a very useful feature!

sunchao avatar Sep 14 '23 16:09 sunchao

Hi @hengqujushi @sunchao, I rebased this and fixed the revapi failure, I think this should be ready for review now. Though I feel this PR is kind of big, maybe we should break this into smaller PRs for easier review? I think we can separate the column index filter part and record materialization on row read path part. cc @rdblue What do you think?

zhongyujiang avatar Sep 17 '23 12:09 zhongyujiang

@zhongyujiang Hi, really appreciate the hard work has been done so far. It's a very useful feature! any update on this PR? Is there any work left to be done?

iflytek-hmwang5 avatar Mar 25 '24 05:03 iflytek-hmwang5

Hi @iflytek-hmwang5 , We're waiting for people in the community who are interested in it to review it.

zhongyujiang avatar Mar 26 '24 05:03 zhongyujiang

Hi @iflytek-hmwang5 , We're waiting for people in the community who are interested in it to review it. @zhongyujiang Hi, what can do to speed up the process? Looking forward to this feature very much.

iflytek-hmwang5 avatar Mar 26 '24 05:03 iflytek-hmwang5

@zhongyujiang Hi, what can do to speed up the process? Looking forward to this feature very much.

Um, I'm not sure, I think it depends on the community's priorities, and reviewers' bandwidth.

zhongyujiang avatar Mar 26 '24 11:03 zhongyujiang

@zhongyujiang Maybe you can refer to this PR, https://github.com/apache/iceberg/issues/193, Anton Okolnychyi mentioned this idea before, maybe he'll be interested in it.

iflytek-hmwang5 avatar Mar 27 '24 01:03 iflytek-hmwang5

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Aug 28 '24 00:08 github-actions[bot]

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

github-actions[bot] avatar Sep 04 '24 00:09 github-actions[bot]