[SPARK-51400] Replace ArrayContains nodes to InSet
What changes were proposed in this pull request?
Add an optimization rule that replaces ArrayContains predicates with InSet ones.
Why are the changes needed?
Performance optimization.
While both ArrayContains and InSet have similar functionality, InSet is better optimized.
ArrayContains predicates are not pushed down to the datasources, while InSet ones are pushed down.
Does this PR introduce any user-facing change?
No
How was this patch tested?
SQL queries like SELECT * FROM ... WHERE array_contains(.... , partitionColumn) and checked the execution plan => with this optimization, the execution plan has a more agressive pushdown-filtering, while without the optimization, the expression became a Post-Scan predicate.
Was this patch authored or co-authored using generative AI tooling?
No
cc @cloud-fan
@adrians Could you provide the micro benchmarks for this optimization ?
@beliefer - Sure, I'll add those later this week.
I've added a benchmark testcase in FilterPushdownBenchmark.scala, by mostly copy-pasting the existing InSet testcase.
One run without the ArrayContains-to-InSet rule (full logs). The arrayContains predicate is applied after a full-scan of the column, it cannot be pushed-down, even if the spark.sql.parquet.filterPushdown flag is enabled. Snippet below:
OpenJDK 64-Bit Server VM 17.0.14+7-LTS on Linux 6.8.0-1021-azure
AMD EPYC 7763 64-Core Processor
ArrayContains -> InFilters (values count: 10, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 6686 6708 18 2.4 425.1 1.0X
Parquet Vectorized (Pushdown) 6726 6779 64 2.3 427.6 1.0X
Native ORC Vectorized 5051 5063 9 3.1 321.2 1.3X
Native ORC Vectorized (Pushdown) 5153 5159 11 3.1 327.6 1.3X
One run with the ArrayContains-to-InSet rule (full logs). The arrayContains predicate is transformed into inSet and, if possible, this predicate is pushed down towards the data-source (allowing pruning at page or row-group level). Snippet below:
OpenJDK 64-Bit Server VM 17.0.14+7-LTS on Linux 6.8.0-1021-azure
AMD EPYC 7763 64-Core Processor
ArrayContains -> InFilters (values count: 10, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 6714 6726 15 2.3 426.8 1.0X
Parquet Vectorized (Pushdown) 305 309 4 51.6 19.4 22.0X
Native ORC Vectorized 4902 4918 10 3.2 311.6 1.4X
Native ORC Vectorized (Pushdown) 305 313 9 51.5 19.4 22.0X
I have a question: is this replacement good to any data sources ?
This change's effect ranges from "big performance gains" to "same level of performance", depending on what other optimizations happen at the same time.
The cases with big performance gains:
- partitioned tables, where the predicate uses the partitioning/bucketing column:
array_contains(..., col)=> instead of doing a full-table-scan, a partition-pruning step is introduced (as result of the predicate-pushdown). - file-formats with row-group statistics (as seen in the Parquet & ORC benchmarks) => instead of doing a full-table-scan, some row-groups are discarded as part of the pushdown.
The cases with same level of performance:
- dealing with file-formats with no row-group statistics (nothing to skip).
- dealing with queries where the partitioning column is not involved in the
array_contains()predicate.
The "root cause" for this pull-request is that a query like SELECT * FROM tbl WHERE col1 IN (1,3,5,7) allows for a lot of optimizations, while SELECT * FROM tbl WHERE array_contains(array(1,3,5,7), col1) triggers full-table-scans (doesn't use row-group statistics, partition pruning).
Of course, array_contains is more flexible (allows for queries like SELECT * FROM tbl WHERE array_contains(from_json(:payload, :schema), col1), where the number of arguments in the array is not known at the parsing time), but my point is that if the argument of the ArrayContains node is resolved to a list of literals (possibly as a result of constant-folding steps), it is safe to convert it to the equivalent col1 IN (val1, val2, val3) expression.
Because this optimization is related to filter push down. Shall we not add an optimize rule and add it to DataSourceV2Strategy.translateFilterV2WithMapping or DataSourceStrategy.translateFilterWithMapping ?
I prefer the optimizer-rule approach since its output-plan can be further improved by other optimization rules, if those rules are written to search for InSet nodes in the query-plan.
For example, in the Iceberg extension there's the ReplaceStaticInvoke rule that matches InSet nodes, and allows for the efficient execution of queries like SELECT * FROM tbl WHERE system.truncate(6, col1) IN ('aaaaaa', 'bbbbbb', 'cccccc') (assuming that truncate(6, col1) is a partitioning column of the table).
Implementing the change as a pattern-match inside DataSourceStrategy.translateFilterWithMapping would mean that only the DataSource can improve on the execution-plan, while higher-level optimizer rules are prevented from acting.
You means there still has benefit without data source push down ?
Yes, there are benefits beyond simple datasource pushdowns (that I've shown in the benchmarks). This optimizer-rule acts as a strength-reduction operation, and allows other rules to be chained to it.
Although the end-result is a predicate-pushdown, the project that I'm working on needs the following section of transformations to happen in order (I've color-coded with red sections that are "soon-to-be-deleted" and with green the sections that are "soon-to-be-added"):
flowchart TD
A["Inserted query"]
-->|"SELECT * FROM tbl WHERE array_contains(<br><span style="color: red;">from_json(:payload, :schema).col1</span>, system.bucket(16, col1))"|B["[Spark Core]<br>Constant folding"]
-->|"SELECT * FROM tbl WHERE <span style="color: red;">array_contains</span>(<br><span style="color: green;">array(val1, val2, val3)</span>, system.bucket(16, col1))"|C["[This PR]<br>Replace ArrayContains with InSet nodes where applicable."]
-->|"SELECT * FROM tbl WHERE <br><span style="color: red;">system.bucket(16, col1)</span> <span style="color: green;">IN</span> (val1, val2, val3)"|D["[Iceberg extension]<br>Replace StaticInvoke(system.bucket) node with ApplyFunctionExpression(system.bucket). Rather than evaluate the function for each row of the table, generate an expression that can be pushed-down.<br>This rule looks for the In/InSet nodes in the logical plan."]
-->|"SELECT * FROM tbl WHERE <span style="color: green;">partitionCol</span> IN<br> (val1, val2, val3)"|E["Iceberg DataSourceV2 implementation"]
When there is no pushdown benefit, is there any down side of changing the physical execution from ArrayContains to InSet?
When there is no pushdown benefit, is there any down side of changing the physical execution from
ArrayContainstoInSet?
I don't see any downsides:
- in the worst-case: similar code is generated by the codeGen (ArrayContains vs InSet)
- in better cases, by chaining more optimizations (such as in the case with iceberg), a slightly better execution plan is achieved, since InSet is more widely supported.
@cloud-fan - Do I need to do anything else on this PR?
@dongjoon-hyun - Can you take a look?
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!