datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

API in ParquetExec to pass in RowSelections to `ParquetExec` (enable custom indexes, finer grained pushdown)

Open alamb opened this issue 1 year ago • 13 comments

Is your feature request related to a problem or challenge?

We are building / testing a specialized index for data stored in parquet that can tell us what row offsets are needed from the parquet file based on additional infomration

Currently the parquet-rs parquet reader allows specifying this type of information via ArrowReaderBuilder::with_row_selection

However, the DataFusion ParquetExec has no way to pass this information down. It does build its own

Describe the solution you'd like

What I would like is a way to provide something like a RowSelection for each row group

Describe alternatives you've considered

Here is one possible API:

let parquet_selection = ParquetSelection::new()
  // * rows 100-250 from row group 1
  .select(1, RowSelection::from(vec![
    RowSelector::skip(100),
    RowSelector::select(150)
  ]);
  // * rows 50-100 and 200-300 in row group 2
  .select(2, RowSelection::from(vec![
    RowSelector::skip(50),
    RowSelector::select(50),
    RowSelector::skip(100),
    RowSelector::select(100),
  ]);

let parquet_exec = ParquetExec::new(...)
  .with_selection(parquet_selection);

Additional context

No response

alamb avatar Apr 03 '24 20:04 alamb

FYI @westonpace this is something we spoke about

FYI @matthewmturner who expressed interest in this: https://github.com/apache/arrow-datafusion/issues/9899#issuecomment-2030139830

alamb avatar Apr 03 '24 20:04 alamb

We implemented a (very crude) implementation of this as part of our parquet benchmarking. Sharing here in case it is interesting as a starting point for someone: https://gist.github.com/westonpace/04f33da51931e0a990997d9ac81b623c

P.S. The implementation assumes that indices are ordered

westonpace avatar Apr 03 '24 20:04 westonpace

The main reason I call it crude is that the "give me rows with these offsets" feels like it should be part of the parquet reader API and it feels like we are somewhat over-utilizing the "skip" API. However, without upstream changes I don't know that one can do much better.

westonpace avatar Apr 03 '24 20:04 westonpace

Sorry for the stream of consciousness github. Last comment (I hope). Reading through I see that you are suggesting the skip API be used in the exec node as well.

While this works, we have found that a more intuitive API is to pass in an iter of desired offsets instead of RowSelection.

westonpace avatar Apr 03 '24 20:04 westonpace

Maybe we could have a version of the API like this (which translates into RowSelection) 🤔

let parquet_selection = ParquetSelection::new()
  // * rows 100-250 from row group 1
  .select_range(1, 100..250);
  // * rows 50-100 and 200-300 in row group 2
  .select_range(2, 50..100)
  .select_range(2, 200..300)

alamb avatar Apr 03 '24 21:04 alamb

The RowSelection api is new to me, going to spend some time reading on it.

matthewmturner avatar Apr 06 '24 15:04 matthewmturner

In my mind RowSelection is a file-level struct and i think there could be multi-files in one parquetExec so ParquetSelection should be multi-files-level right?


btw if user customized index is file level like page-index, i think directly use RowSelection is more easy way 🤔

Ted-Jiang avatar Apr 07 '24 04:04 Ted-Jiang

In my mind RowSelection is a file-level struct and i think there could be multi-files in one parquetExec so ParquetSelection should be multi-files-level right?

Yes, you are right. That is an excellent point

btw if user customized index is file level like page-index, i think directly use RowSelection is more easy way 🤔

I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.

FWIW what I hope to do over the next few weeks is to whip up a little POC showing how one might build a specialized index on top of paruqet files as a Demo and then figure out what types of APIs would be needed in DataFusion

alamb avatar Apr 07 '24 10:04 alamb

I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.

IIRC the parquet reader works by two levels of selection. First some row group filter selects which group to fetch, and then the row selector tells which row in that row group to read.

It might not be very tricky to translate the "overall file selection" into parquet RowSelector, as ParquetMetadata can tell how many rows a row group contains.

waynexia avatar Apr 21 '24 10:04 waynexia

Here is an example (from @waynexia on twitter) of using this kind of API: https://twitter.com/wayne17229928/status/1781997834356850945

https://github.com/GreptimeTeam/greptimedb/tree/tantivy-poc

alamb avatar Apr 23 '24 23:04 alamb

Related code is here https://github.com/GreptimeTeam/greptimedb/commit/9e1e4a518143236371b76ecb6f1da5c694eb867b#diff-ac43dc13456cf41e4fabb9d577101e245366687d49064aff99bf10aab20b9cd0R429-R480

First, get the precise row number of rows to read (in the file level)

let mut selected_row = applier.apply(file_id).unwrap();

Then translate the file level row number into row group selection:

        // translate `selected_row` into row groups selection
        selected_row.sort_unstable();
        let mut row_groups_selected = BTreeMap::new();
        for row_id in selected_row.iter() {
            let row_group_id = row_id / row_group_size;
            let rg_row_id = row_id % row_group_size;

            row_groups_selected
                .entry(row_group_id)
                .or_insert_with(Vec::new)
                .push(rg_row_id);
        }
        let row_group = row_groups_selected
            .into_iter()
            .map(|(row_group_id, row_ids)| {
                let mut current_row = 0;
                let mut selection = vec![];
                for row_id in row_ids {
                    selection.push(RowSelector::skip(row_id - current_row));
                    selection.push(RowSelector::select(1));
                    current_row = row_id + 1;
                }

                (row_group_id, Some(RowSelection::from(selection)))
            })
            .collect();

The result BTreeMap<usize, Option<RowSelection>> is a map from "the number of row group" to "selection within that row group".

waynexia avatar Apr 24 '24 03:04 waynexia

I hope to work on this issue this week

alamb avatar May 13 '24 11:05 alamb

Update here is that I found it was maybe too large a step to get to the row level access initially -- instead I started with a basic example of building a file level index -- https://github.com/apache/datafusion/issues/10546 / https://github.com/apache/datafusion/pull/10549

Once that is looking good I will make a more "advanced" example of building a row group level index where I can use this API.

I will likely not be able to get to this item this week

alamb avatar May 20 '24 11:05 alamb

Update here is I have the API for specifying the selection sketched out here: https://github.com/apache/datafusion/pull/10738

alamb avatar Jun 03 '24 13:06 alamb

Here is a PR with the proposed API exposed: https://github.com/apache/datafusion/pull/10813

alamb avatar Jun 06 '24 20:06 alamb