Track table statistics in cudf-polars streaming engine
Description
- Updates
PartitionInfoto include a newtable_stats: TableStatsattribute.- These table statistics are now updated in Join, GroupBy, Filter, and HStack. The heuristics used to update these statistics is still rough, and missing in many places.
- Updates
_sample_pq_statisticsto populate/return aTableStatsobject (with caching)- Uses
parquet_metadata_samplesconfig to choose how many files to sample metadata from to estimate the average files size and the total row count. - Reads a single row-group from the dataset to approximate the unique count and unique fraction of each column. The
parquet_rowgroup_samplesconfig can be used to sample fewer or more row groups. However, this data will all be read in at once (so a larger value can be risky). - We cache the
TableStatscollected for the last 10 datasets. - TODO: Use
distinct_countstatistics instead of reading real row-group data (needs new plc feature - I think).
- Uses
- Adds
PartitoinInfo.newto make it easier to propagate theTableStatsinformation aspartition_infois constructed. - Updates
explain_queryto include row-count estimates.
Before we use TableStats to implement automatic repartitioning (removed in a087618), I strongly recommend that we clean up the statistics propagation in follow-up work until the row-count estimates look accurate for all PDS-H queries. For example, this is what the physical plan looks like for query 4:
Query 4 - Physical plan
SORT ('o_orderpriority',) ('o_orderpriority', 'order_count') [1 partitions, ~47996 rows]
SELECT ('o_orderpriority', 'order_count') [1 partitions, ~47996 rows]
SELECT ('o_orderpriority', '_______________0') [1 partitions, ~47996 rows]
GROUPBY ('o_orderpriority',) ('o_orderpriority', '_______________0') [1 partitions, ~4799677200 rows]
REPARTITION ('o_orderpriority', '_______________0') [1 partitions, ~4799677200 rows]
GROUPBY ('o_orderpriority',) ('o_orderpriority', '_______________0') [18 partitions, ~4799677200 rows]
JOIN Semi ('o_orderkey',) ('l_orderkey',) ('o_orderkey', 'o_orderpriority', 'o_orderdate') [18 partitions, ~4799677200 rows]
SHUFFLE ('o_orderkey', 'o_orderpriority', 'o_orderdate') [18 partitions, ~1200000000 rows]
UNION ('o_orderkey', 'o_orderpriority', 'o_orderdate') [5 partitions, ~1200000000 rows]
SCAN PARQUET ('o_orderkey', 'o_orderpriority', 'o_orderdate') [1 partitions, ~unknown rows]
(repeated 5 times)
SHUFFLE ('l_orderkey', 'l_commitdate', 'l_receiptdate') [18 partitions, ~4799677200 rows]
UNION ('l_orderkey', 'l_commitdate', 'l_receiptdate') [18 partitions, ~4799677200 rows]
SCAN PARQUET ('l_orderkey', 'l_commitdate', 'l_receiptdate') [1 partitions, ~unknown rows]
(repeated 18 times)
Real output:
shape: (5, 2)
┌─────────────────┬─────────────┐
│ o_orderpriority ┆ order_count │
│ --- ┆ --- │
│ str ┆ u32 │
╞═════════════════╪═════════════╡
│ 1-URGENT ┆ 11016361 │
│ 2-HIGH ┆ 11013066 │
│ 3-MEDIUM ┆ 11014217 │
│ 4-NOT SPECIFIED ┆ 11014728 │
│ 5-LOW ┆ 11017274 │
└─────────────────┴─────────────┘
Notice that the final row-count estimate is off (5 vs 47996). In this case, the error is fine, but other cases could be much more problematic.
Note: In addition to the estimated row count, we are also keeping track of the following for each column:
- Estimated unique count
- Estimated unique fraction
- Estimated element-size
- Estimated file size (this is only used for choosing the initial partition count for Scan).
Checklist
- [ ] I am familiar with the Contributing Guidelines.
- [ ] New or existing tests cover these changes.
- [ ] The documentation is up to date with these changes.
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.
Contributors can view more details about this message here.
/ok to test
/ok to test
Given that the current state of this PR still seems to be in flux, I'm going to bump this to 25.08 since it is not critical to be in this release IIUC.