[EPIC] Efficiently and correctly extract parquet statistics into ArrayRefs
Is your feature request related to a problem or challenge?
There are at least three places that parquet statistics are extracted into ArrayRefs today
- ParquetExec (pruning Row Groups): https://github.com/apache/datafusion/blob/465c89f7f16d48b030d4a384733567b91dab88fa/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L18-L17
-
ParquetExec (Pruning pages): https://github.com/apache/datafusion/blob/671cef85c550969ab2c86d644968a048cb181c0c/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L393-L392
-
ListingTable (pruning files): https://github.com/apache/datafusion/blob/97148bd105fc2102b0444f2d67ef535937da5dfe/datafusion/core/src/datasource/file_format/parquet.rs#L295-L294
Not only are there three copies of the code, they are all subtly different (e.g. https://github.com/apache/datafusion/issues/8295) and have varying degrees of testing
Describe the solution you'd like
I would like one API with the following properties:
- Extracts statistics from one or more parquet files as
ArrayRefs suitable to pass to PruningPredicate - Does so correctly (applies the appropriate schema coercion / conversion rules)
- Does so quickly and efficiently (e.g. does not do this once per row group), is suitable for 1000s of parquet files
Describe alternatives you've considered
Some ideas from https://github.com/apache/arrow-rs/issues/4328
Subtasks
- [x] https://github.com/apache/datafusion/pull/10537
- [x] https://github.com/apache/datafusion/issues/10606
- [x] https://github.com/apache/datafusion/issues/10585
- [ ] https://github.com/apache/datafusion/issues/10586
- [x] https://github.com/apache/datafusion/issues/10587
- [ ] Complete porting tests from
datafusion/core/src/datasource/physical_plan/parquet/statistics.rstodatafusion/core/tests/parquet/arrow_statistics.rs - [ ] Convert ParquetExec to use the new API for RowGroup pruning
- [ ] Convert ParquetExec to use the new API for PagePruning
- [ ] Convert
ListingTableto use the new API for file pruning - [ ] https://github.com/apache/datafusion/issues/10604
- [ ] #10605
- [ ] https://github.com/apache/datafusion/issues/10609
- [ ] https://github.com/apache/datafusion/issues/10626
Here is a proposed API:
/// statistics extracted from `Statistics` as Arrow `ArrayRef`s
///
/// # Note:
/// If the corresponding `Statistics` is not present, or has no information for
/// a column, a NULL is present in the corresponding array entry
pub struct ArrowStatistics {
/// min values
min: ArrayRef,
/// max values
max: ArrayRef,
/// Row counts (UInt64Array)
row_count: ArrayRef,
/// Null Counts (UInt64Array)
null_count: ArrayRef,
}
// (TODO accessors for min/max/row_count/null_count)
/// Extract `ArrowStatistics` from the parquet [`Statistics`]
pub fn parquet_stats_to_arrow(
arrow_datatype: &DataType,
statistics: impl IntoIterator<Item = Option<&Statistics>>
) -> Result<ArrowStatisics> {
todo!()
}
Maybe it would make sense to have something more builder style:
struct ParquetStatisticsExtractor {
...
}
// create an extractor that can extract data from parquet files
let extractor = ParquetStatisticsExtractor::new(arrow_schema, parquet_schema)
// get parquet statistics (one for each row group) somehow:
let parquet_stats: Vec<&Statistics> = ...;
// extract min/max values for column "a" and "b";
let col_a stats = extractor.extract("a", parquet_stats.iter());
let col_b stats = extractor.extract("b", parquet_stats.iter());
(This is similar to the existing API parquet::arrow::parquet_to_arrow_schema)
Note Statistics above is Statistics
There is a version of this code here in DataFusion that could perhaps be adapted: https://github.com/apache/datafusion/blob/accce9732e26723cab2ffc521edbf5a3fe7460b3/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179-L186
Testing
I suggest we add a new module to the existing parquet test in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/parquet_exec.rs
The tests should look like:
let record_batch = make_batch_with_relevant_datatype();
// write batch/batches to file
// open file / extract stats from metadata
// compare stats
I can help writing these tests
I personally suggest:
- Make a PR with the basic API and a single basic types (like Int/UInt or String) and figure out the test pattern (I can definitely help here)
- Then we can fill out support for the rest of the types in a follow on PR
cc @tustvold in case you have other ideas
Additional context
This code likely eventually would be good to have in the parquet crate -- see https://github.com/apache/arrow-rs/issues/4328. However, I think initially we should do it in DataFusion to iterate faster and figure out the API before moving it up there
There are a bunch of related improvements that I think become much simpler with this feature:
- https://github.com/apache/datafusion/issues/8229
In terms of sequencing of this feature what I would recommend
First PR
Purpose: Sketch out the API, and test framework
- Create a test framework for this
- Create the basic API and extract min/max values for Int64 columns
Second PR (draft)
purpose: demonstrate the API can be used in DataFusion, also ensure test coverage is adequate Update one of the uses of parquet statistics (like ListingTable) to use the new API. @alamb would like to do this if I have time
Third+Fourth+... PRs
Add support for the remaining datatypes, along with tests This part can be parallelized into multiple PRs
I start working on the first PR
After working through an actual example in https://github.com/apache/datafusion/pull/10549 I have a new API proposal: https://github.com/NGA-TRAN/arrow-datafusion/pull/118
Here is what the API looks like
/// What type of statistics should be extracted?
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RequestedStatistics {
/// Minimum Value
Min,
/// Maximum Value
Max,
/// Null Count, returned as a [`UInt64Array`])
NullCount,
}
/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
/// conversions. This information can be used for pruning parquet files or row
/// groups based on the statistics embedded in parquet files
///
/// # Schemas
///
/// The schema of the parquet file and the arrow schema are used to convert the
/// underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow value. For example, Decimals are stored as binary in
/// parquet files.
///
/// The parquet_schema and arrow _schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other schemas
/// may have additional columns). The function [`parquet_column`] is used to
/// match the column in the parquet file to the column in the arrow schema.
///
/// # Multiple parquet files
///
/// This API is designed to support efficiently extracting statistics from
/// multiple parquet files (hence why the parquet schema is passed in as an
/// argument). This is useful when building an index for a directory of parquet
/// files.
///
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// The name of the column to extract statistics for
column_name: &'a str,
/// The type of statistics to extract
statistics_type: RequestedStatistics,
/// The arrow schema of the query
arrow_schema: &'a Schema,
/// The field (with data type) of the column in the arrow schema
arrow_field: &'a Field,
}
impl<'a> StatisticsConverter<'a> {
/// Returns a [`UInt64Array`] with counts for each row group
///
/// The returned array has no nulls, and has one value for each row group.
/// Each value is the number of rows in the row group.
pub fn row_counts(metadata: &ParquetMetaData) -> Result<UInt64Array> {
...
}
/// create an new statistics converter
pub fn try_new(
column_name: &'a str,
statistics_type: RequestedStatistics,
arrow_schema: &'a Schema,
) -> Result<Self> {
...
}
/// extract the statistics from a parquet file, given the parquet file's metadata
///
/// The returned array contains 1 value for each row group in the parquet
/// file in order
///
/// Each value is either
/// * the requested statistics type for the column
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min or max value was actually
/// `null` it means it the requested statistic is unknown
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the column are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn extract(&self, metadata: &ParquetMetaData) -> Result<ArrayRef> {
...
}
}
I am envisioning this API could also easily support
Extract from multiple files in one go
impl<'a> StatisticsConverter<'a> {
..
/// Extract metadata from multiple parquet files into an single arrow array
/// one element per row group per file
fn extract_multi(&self, metadata: impl IntoIterator<Item = &ParquetMetadata>))-> Result<ArrayRef> {
...
}
Extract information from the page index as well
impl<'a> StatisticsConverter<'a> {
..
/// Extract metadata from page indexes across all row groups. The returned array has one element
/// per page across all row groups
fn extract_page(&self, metadata: impl IntoIterator<Item = &ParquetMetadata>))-> Result<ArrayRef> {
...
}
@alamb I have created 2 more bug tickets but I cannot edit the description to add them in the subtasks. Can you help with that?
- https://github.com/apache/datafusion/issues/10604
- https://github.com/apache/datafusion/issues/10605
@alamb I have created 2 more bug tickets but I cannot edit the description to add them in the subtasks. Can you help with that?
Done
@alamb Another bug: https://github.com/apache/datafusion/issues/10609
@alamb just hint #10605 is also closed.
FYI I have a proposed API change in https://github.com/apache/datafusion/issues/10806
Given how far we have come with this ticket, I plan to close this ticket and do some organizing of the remaining tasks as follow on tickets / epics
This issue is done enough -- I am consolidating the remaining todo items under https://github.com/apache/datafusion/issues/10922