datafusion
datafusion copied to clipboard
Support data source sampling with TABLESAMPLE
Which issue does this PR close?
- Closes #13563
Rationale for this change
Explained in #13563 in detail with known syntax examples.
Thanks to changes to sqlparser, it is now possible to use the TABLESAMPLE and SAMPLE constructs in a logical plan builder.
Added a rewrite function to datafusion-sql which produces an additional LogicalPlan::Filter based on TableSample from sqlparser. There is no need to remove anything from SQL AST since it's not used anywhere.
What changes are included in this PR?
Are these changes tested?
- Unit tests (added a few)
- Regression tests (added to select.slt)
- Manual test (see below)
create external table data stored as parquet location 'sample.parquet';
select COUNT(*) from data TABLESAMPLE SYSTEM (13) where double_col < 42.0;
Leads to the initial logical plan
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data
The physical plan is somewhat unusual, as volatile functions are pushed down to the data source (datafusion.execution.parquet.pushdown_filters is enabled by default), which was addressed in #13268.
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: double_col@0 < 42 AND random() < 0.426
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@0 < 42 AND random() < 0.426, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_min@0 < 42, required_guarantees=[]
More details:
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final analyzed plan: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Filter: random() < Float64(42.6) / Float64(100) Filter: data.double_col < Float64(42) TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::analyzer] Analyzer took 0 ms [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 0): Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Filter: random() < Float64(42.6) / Float64(100) Filter: data.double_col < Float64(42) TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] simplify_expressions: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Filter: random() < Float64(0.426) Filter: data.double_col < Float64(42) TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data, partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 0) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 0): Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 1): Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'simplify_expressions' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 1) [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 1): Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] optimizer pass 1 did not make changes [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final optimized plan: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Projection: Filter: data.double_col < Float64(42) AND random() < Float64(0.426) TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Optimizer took 7 ms [2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Input physical plan: ProjectionExec: expr=[count(Int64(1))@0 as count(*)] AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] ProjectionExec: expr=[] FilterExec: double_col@0 < 42 AND random() < 0.426 DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported [2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported [2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426 [2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Optimized physical plan: ProjectionExec: expr=[count(Int64(1))@0 as count(*)] AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] CoalescePartitionsExec AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] ProjectionExec: expr=[] CoalesceBatchesExec: target_batch_size=8192 FilterExec: double_col@0 < 42 AND random() < 0.426 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@0 < 42 AND random() < 0.426, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_min@0 < 42, required_guarantees=[]
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported [2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported [2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426 [2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Use filter and page index to create RowSelection RowSelection { selectors: [RowSelector { row_count: 7300, skip: false }] } from predicate: BinaryExpr { left: BinaryExpr { left: Column { name: "double_col_null_count", index: 1 }, op: NotEq, right: Column { name: "row_count", index: 2 }, fail_on_overflow: false }, op: And, right: BinaryExpr { left: Column { name: "double_col_min", index: 0 }, op: Lt, right: Literal { value: Float64(42), field: Field { name: "42", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false } +----------+ | count(*) | +----------+ | 1576 | +----------+ 1 row(s) fetched. Elapsed 0.042 seconds.
Are there any user-facing changes?
No. The behaviour is changed only if a user specifies a new SAMPLE / TABLESAMPLE SQL syntax, which was not supported before.
Thank you for the review and suggestions! I'll rework the testing approach and get back with the improved version.
According to PostgreSQL's reference: https://wiki.postgresql.org/wiki/TABLESAMPLE_Implementation#SYSTEM_Option I believe
SYSTEMoption is equivalent to keep the entireRecordBatchaccording to the specified probability, this rewrite rule implemented here is sampling row by row, which follows the behavior ofBERNOULLIoption. Since df has vectorized execution, evaluation arandom() < xfilter should be efficient, I think we can apply this implementation on bothSYSTEMandBERNOULLIoption to keep it simple.
@2010YOUY01 I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters?
Before:
[2025-06-18T18:20:07Z TRACE datafusion::physical_planner] Optimized physical plan by LimitedDistinctAggregation:
OutputRequirementExec
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
FilterExec: random() < 0.1
DataSourceExec: file_groups={1 group: [[sample.parquet]]}, file_type=parquet
After:
[2025-06-18T18:20:07Z TRACE datafusion::physical_planner] Optimized physical plan by FilterPushdown:
OutputRequirementExec
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
DataSourceExec: file_groups={1 group: [[sample.parquet]]}, file_type=parquet, predicate=random() < 0.1
Data:
I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters?
Moved to a new discussion issue #16545, since it's not strictly related to this PR.
I wonder would creating new physical plan operator to do per batch sampling avoid issues @theirix mentioned.
Something similar to https://github.com/milenkovicm/ballista_extensions/blob/master/src/physical/sample_exec.rs ?
Edit: implementation demonstrates ballista extensibility, implementation might not be correct/optimal
@2010YOUY01 I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters?
This problem has been recently fixed in #16545 - I've added a slt testcase here to verify the correct expected behaviour.
@theirix many Thanks for this PR! The SYSTEM sampling strategy is really useful for large tables, and I've seen more interesting variants on this in the ray project, where shuffling input data is a key requirement for training robust models:
https://docs.ray.io/en/latest/data/shuffling-data.html
At our company we're heavy users of delta table format and we're using delta-rs to consume them.
In that scenario, you can do file pruning (partitions, statistics) and limit pushdowns from the metadata layer, which happens as part of predicate pushdown and limit pushdown.
I've recently authored a PR that enables LIMIT pushdown for partition predicates for delta tables: https://github.com/delta-io/delta-rs/pull/3436/commits/15f2ade11c8627173bfca6568c9f3f6f2dd6c619
Have you considered this use case? Wondering what we would need to do at this stage to pass in enough information in the datasource to turn the SYSTEM hint in a metadata operation and keep other optimizations alive. In the delta-rs case we could simply randomize the order of active files before pruning them.
Thank you so much, @aditanase! Your LIMIT idea looks useful and should bring performance improvements.
There are numerous advanced use cases and possible data source-level optimisations for table sampling. To fully support them, the executor should be provided with all the information about sampling. This PR focuses on a simple filter propagation. As @milenkovicm suggested, introducing a special operator is more flexible and preserves sampling semantics, which could help implement such features.
We can consider implementing a LIMIT either for the approach in this PR, or for a possible physical plan operator. I could work on an operator implementation if this approach doesn't look flexible enough for maintainers. @2010YOUY01, @alamb , what do you think?
We can consider implementing a
LIMITeither for the approach in this PR, or for a possible physical plan operator. I could work on an operator implementation if this approach doesn't look flexible enough for maintainers. @2010YOUY01, @alamb , what do you think?
Sorry for my late reply -- I wrote up some thoughts here:
- https://github.com/apache/datafusion/issues/13563#issuecomment-3196874378
Unstale for a reference in the improved PR (sampling via extension)
Great work! May I ask what's the status of this PR? Looking forward to using it.
Great work! May I ask what's the status of this PR? Looking forward to using it.
Thank you! The more modern and extensible approach is proposed in my PR here https://github.com/apache/datafusion/pull/17633 , and it should be a way to go with table sampling support