datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Add statistics to ParquetExec for *files* pruned

Open alamb opened this issue 7 months ago • 7 comments

Is your feature request related to a problem or challenge?

  • This is a follow on to the feature added by @adriangb in https://github.com/apache/datafusion/pull/16014

@adriangb added the great feature that can prune entire files while opening many parquet files

The current statistics for DataSourceExec have information on how many row groups were pruned, it would also be great to add statistics on how many FILES were pruned by this new code

For example, with clickbench Q24 here is an excerpt from the file

EXPLAIN ANALYZE SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;
|                   |         DataSourceExec:...
 pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=325, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0

Describe the solution you'd like

I would like some new statistics that record:

  • files_pruned: total files that were pruned by filters during open

It is important to make sure the docs explain the metric only describes files pruned after the plan starts (not files that are pruned during planning)

Describe alternatives you've considered

  1. Add a field to ParquetFileMetrics: https://github.com/apache/datafusion/blob/6d5e00ad3f8e53f7252cb1d3c72a6c7f28c1aed6/datafusion/datasource-parquet/src/metrics.rs#L29-L28
  2. Thread that through to the opener in datafusion/datasource-parquet/src/opener.rs so when files are pruned we can see that in the metrics

Additional context

No response

alamb avatar Jun 13 '25 18:06 alamb

Doesn't https://github.com/apache/datafusion/blob/4dd6923787084548c9ecc6d90c630c2c28ee9259/datafusion/datasource-parquet/src/metrics.rs#L30-L33 cover that?

adriangb avatar Jun 13 '25 18:06 adriangb

cover that?

Yes 🤦

For some reason it doesn't show up for me in the explain analyze I have: q25-analyze-topk-dynamic-filter.txt

Which I made with this command from the https://github.com/apache/datafusion/pull/15770 branch

$ cat q25-analyze.sql
EXPLAIN ANALYZE SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;
./datafusion-cli-topk-dynamic-filters -f q25-analyze.sql  > q25-analyze-topk-dynamic-filter.txt

But the query clearly got faster, so I would expect it to be present 🤔

alamb avatar Jun 13 '25 19:06 alamb

cover that?

Yes 🤦

For some reason it doesn't show up for me in the explain analyze I have: q25-analyze-topk-dynamic-filter.txt

Which I made with this command from the #15770 branch

$ cat q25-analyze.sql EXPLAIN ANALYZE SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10; ./datafusion-cli-topk-dynamic-filters -f q25-analyze.sql > q25-analyze-topk-dynamic-filter.txt But the query clearly got faster, so I would expect it to be present 🤔

Hmm maybe we aren't including that statistic in the output?

adriangb avatar Jun 13 '25 19:06 adriangb

Hmm maybe we aren't including that statistic in the output?

I think everything that is non zero is included. I'll have to look into it some more

alamb avatar Jun 13 '25 19:06 alamb

Could it be that in that test we don't have file statistics (datafusion.execution.collect_statistics = false) -> the pruning is happening at the row group level?

adriangb avatar Jun 13 '25 19:06 adriangb

Could it be that in that test we don't have file statistics (datafusion.execution.collect_statistics = false) -> the pruning is happening at the row group level?

set datafusion.execution.collect_statistics = true;
EXPLAIN ANALYZE SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;

And that still doesn't seem to help

Let's wait for https://github.com/apache/datafusion/pull/15770 to merge and then I can dig into this more

alamb avatar Jun 13 '25 20:06 alamb

(basically I want to be able to see from statistics when the dynamic filters are helping / not helping)

alamb avatar Jun 13 '25 20:06 alamb

I will look into this next week

adriangb avatar Jun 21 '25 17:06 adriangb

I did a bit of investigation.

Using hits_partitioned and some massaging I was able to get the expected result:

SET datafusion.execution.target_partitions = 1;
EXPLAIN ANALYZE SELECT "SearchPhrase" FROM 'hits_partitioned' WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" DESC LIMIT 10;

> EXPLAIN ANALYZE SELECT "SearchPhrase" FROM 'hits_partitioned' WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" DESC LIMIT 10;
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortExec: TopK(fetch=10), expr=[SearchPhrase@0 DESC], preserve_partitioning=[false], filter=[SearchPhrase@0 IS NULL OR SearchPhrase@0 > EF83BC09D0B2D0BBD0B0...], metrics=[output_rows=10, elapsed_compute=1.284534752s, row_replacements=176]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1197862, elapsed_compute=43.685164ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |     FilterExec: SearchPhrase@0 != , metrics=[output_rows=1197862, elapsed_compute=535.526049ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |       DataSourceExec: file_groups={1 group: [[Users/adriangb/GitHub/datafusion/benchmarks/data/hits_partitioned/hits_0.parquet:0..122446530, Users/adriangb/GitHub/datafusion/benchmarks/data/hits_partitioned/hits_1.parquet:0..174965044, Users/adriangb/GitHub/datafusion/benchmarks/data/hits_partitioned/hits_10.parquet:0..101513258, Users/adriangb/GitHub/datafusion/benchmarks/data/hits_partitioned/hits_11.parquet:0..118419888, Users/adriangb/GitHub/datafusion/benchmarks/data/hits_partitioned/hits_12.parquet:0..149514164, ...]]}, projection=[SearchPhrase], file_type=parquet, predicate=SearchPhrase@0 !=  AND DynamicFilterPhysicalExpr [ SearchPhrase@0 IS NULL OR SearchPhrase@0 > EF83BC09D0B2D0BBD0B0... ], pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 !=  OR  != SearchPhrase_max@1) AND (SearchPhrase_null_count@2 > 0 OR SearchPhrase_null_count@2 != row_count@3 AND SearchPhrase_max@1 > EF83BC09D0B2D0BBD0B0...), required_guarantees=[SearchPhrase not in ()] |
|                   | , metrics=[output_rows=8476907, elapsed_compute=1ns, bytes_scanned=34921389, file_open_errors=0, file_scan_errors=0, files_pruned_statistics=85, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=22, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=26, bloom_filter_eval_time=904.017µs, metadata_load_time=32.92323ms, page_index_eval_time=3.56µs, row_pushdown_eval_time=200ns, statistics_eval_time=692.725µs, time_elapsed_opening=21.182088ms, time_elapsed_processing=1.370805897s, time_elapsed_scanning_total=3.212635043s, time_elapsed_scanning_until_data=211.536997ms]                                                                                                                                                                                                                                           |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 3.717 seconds.

(files_pruned_statistics=85)

I had to ORDER BY "SearchPhrase" DESC: ASC gives no pruning. I don't know much about the SearchPhrase column or how the data is laid out but I do think one of the remaining TODOs for the TopK optimization is relevant here: we need to optimize file opening order (based on statistics?) to match the preferred order of the query / sort. I opened #16555 to track.

As you'd expect ASC took 30s and DESC took 3s.

@alamb do you think we can close this issue and continue in #16555?

adriangb avatar Jun 25 '25 15:06 adriangb

@alamb do you think we can close this issue and continue in https://github.com/apache/datafusion/issues/16555?

Good idea -- thank you for the follow up

alamb avatar Jun 25 '25 19:06 alamb