datafusion
datafusion copied to clipboard
API in ParquetExec to pass in RowSelections to `ParquetExec` (enable custom indexes, finer grained pushdown)
Is your feature request related to a problem or challenge?
We are building / testing a specialized index for data stored in parquet that can tell us what row offsets are needed from the parquet file based on additional infomration
Currently the parquet-rs parquet reader allows specifying this type of information via ArrowReaderBuilder::with_row_selection
However, the DataFusion ParquetExec has no way to pass this information down. It does build its own
Describe the solution you'd like
What I would like is a way to provide something like a RowSelection for each row group
Describe alternatives you've considered
Here is one possible API:
let parquet_selection = ParquetSelection::new()
// * rows 100-250 from row group 1
.select(1, RowSelection::from(vec![
RowSelector::skip(100),
RowSelector::select(150)
]);
// * rows 50-100 and 200-300 in row group 2
.select(2, RowSelection::from(vec![
RowSelector::skip(50),
RowSelector::select(50),
RowSelector::skip(100),
RowSelector::select(100),
]);
let parquet_exec = ParquetExec::new(...)
.with_selection(parquet_selection);
Additional context
No response
FYI @westonpace this is something we spoke about
FYI @matthewmturner who expressed interest in this: https://github.com/apache/arrow-datafusion/issues/9899#issuecomment-2030139830
We implemented a (very crude) implementation of this as part of our parquet benchmarking. Sharing here in case it is interesting as a starting point for someone: https://gist.github.com/westonpace/04f33da51931e0a990997d9ac81b623c
P.S. The implementation assumes that indices are ordered
The main reason I call it crude is that the "give me rows with these offsets" feels like it should be part of the parquet reader API and it feels like we are somewhat over-utilizing the "skip" API. However, without upstream changes I don't know that one can do much better.
Sorry for the stream of consciousness github. Last comment (I hope). Reading through I see that you are suggesting the skip API be used in the exec node as well.
While this works, we have found that a more intuitive API is to pass in an iter of desired offsets instead of RowSelection.
Maybe we could have a version of the API like this (which translates into RowSelection) 🤔
let parquet_selection = ParquetSelection::new()
// * rows 100-250 from row group 1
.select_range(1, 100..250);
// * rows 50-100 and 200-300 in row group 2
.select_range(2, 50..100)
.select_range(2, 200..300)
The RowSelection api is new to me, going to spend some time reading on it.
In my mind RowSelection is a file-level struct and i think there could be multi-files in one parquetExec so ParquetSelection should be multi-files-level right?
btw if user customized index is file level like page-index, i think directly use RowSelection is more easy way 🤔
In my mind
RowSelectionis a file-level struct and i think there could be multi-files in one parquetExec soParquetSelectionshould bemulti-files-levelright?
Yes, you are right. That is an excellent point
btw if user customized index is file level like page-index, i think directly use
RowSelectionis more easy way 🤔
I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.
FWIW what I hope to do over the next few weeks is to whip up a little POC showing how one might build a specialized index on top of paruqet files as a Demo and then figure out what types of APIs would be needed in DataFusion
I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.
IIRC the parquet reader works by two levels of selection. First some row group filter selects which group to fetch, and then the row selector tells which row in that row group to read.
It might not be very tricky to translate the "overall file selection" into parquet RowSelector, as ParquetMetadata can tell how many rows a row group contains.
Here is an example (from @waynexia on twitter) of using this kind of API: https://twitter.com/wayne17229928/status/1781997834356850945
https://github.com/GreptimeTeam/greptimedb/tree/tantivy-poc
Related code is here https://github.com/GreptimeTeam/greptimedb/commit/9e1e4a518143236371b76ecb6f1da5c694eb867b#diff-ac43dc13456cf41e4fabb9d577101e245366687d49064aff99bf10aab20b9cd0R429-R480
First, get the precise row number of rows to read (in the file level)
let mut selected_row = applier.apply(file_id).unwrap();
Then translate the file level row number into row group selection:
// translate `selected_row` into row groups selection
selected_row.sort_unstable();
let mut row_groups_selected = BTreeMap::new();
for row_id in selected_row.iter() {
let row_group_id = row_id / row_group_size;
let rg_row_id = row_id % row_group_size;
row_groups_selected
.entry(row_group_id)
.or_insert_with(Vec::new)
.push(rg_row_id);
}
let row_group = row_groups_selected
.into_iter()
.map(|(row_group_id, row_ids)| {
let mut current_row = 0;
let mut selection = vec![];
for row_id in row_ids {
selection.push(RowSelector::skip(row_id - current_row));
selection.push(RowSelector::select(1));
current_row = row_id + 1;
}
(row_group_id, Some(RowSelection::from(selection)))
})
.collect();
The result BTreeMap<usize, Option<RowSelection>> is a map from "the number of row group" to "selection within that row group".
I hope to work on this issue this week
Update here is that I found it was maybe too large a step to get to the row level access initially -- instead I started with a basic example of building a file level index -- https://github.com/apache/datafusion/issues/10546 / https://github.com/apache/datafusion/pull/10549
Once that is looking good I will make a more "advanced" example of building a row group level index where I can use this API.
I will likely not be able to get to this item this week
Update here is I have the API for specifying the selection sketched out here: https://github.com/apache/datafusion/pull/10738
Here is a PR with the proposed API exposed: https://github.com/apache/datafusion/pull/10813