spark
spark copied to clipboard
[SPARK-39698][SQL] Use `TakeOrderedAndProject` if maxRows below the `spark.sql.execution.topKSortMaxRowsThreshold`
What changes were proposed in this pull request?
Spark will set maxRows at runtime: https://github.com/apache/spark/blob/bb6f65acca2918a0ceb13b612d210f1b46fa1add/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala#L58
This PR update SpecialLimits to use TakeOrderedAndProject if maxRows below the spark.sql.execution.topKSortMaxRowsThreshold. For example:
create table t1 using parquet as select id, cast(id as string) as value from range(10);
select * from (select distinct * from t1) order by id;
The query plan:
== Physical Plan ==
AdaptiveSparkPlan (14)
+- == Final Plan ==
TakeOrderedAndProject (8)
+- * HashAggregate (7)
+- AQEShuffleRead (6)
+- ShuffleQueryStage (5), Statistics(sizeInBytes=320.0 B, rowCount=10)
+- Exchange (4)
+- * HashAggregate (3)
+- * ColumnarToRow (2)
+- Scan parquet spark_catalog.default.t1 (1)
+- == Initial Plan ==
Sort (13)
+- Exchange (12)
+- HashAggregate (11)
+- Exchange (10)
+- HashAggregate (9)
+- Scan parquet spark_catalog.default.t1 (1)
Why are the changes needed?
Improve query performance by reducing shuffle.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test. Benchmark test code:
import org.apache.spark.benchmark.Benchmark
val numRows = 1024 * 1024 * 15
spark.sql(s"CREATE TABLE t1 using parquet AS select id as a, id as b, id as c, id as d, id as e from range(${numRows}L)")
Seq(10, 100, 1000, 10000, 20000, 50000, 100000, 500000, 800000, 1000000).foreach { maxRows =>
val benchmark = new Benchmark(s"Benchmark SPARK-39698 with max rows = $maxRows", numRows, minNumIters = 5)
Seq(false, true).foreach { enabled =>
val threshold = if (enabled) maxRows else -1
val name = s"TakeOrderedAndProjectExec is ${if (enabled) "Enabled" else "Disabled"}"
benchmark.addCase(name) { _ =>
withSQLConf("spark.sql.execution.topKSortMaxRowsThreshold" -> s"$threshold") {
spark.sql(s"SELECT distinct * FROM t1 where a < $maxRows - 1 order by a").write.format("noop").mode("Overwrite").save()
}
}
}
benchmark.run()
}
Benchmark result:
Benchmark SPARK-39698 with max rows = 10: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 156 203 38 100.8 9.9 1.0X
TakeOrderedAndProjectExec is Enabled 92 100 8 171.1 5.8 1.7X
Benchmark SPARK-39698 with max rows = 100: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 117 129 11 134.8 7.4 1.0X
TakeOrderedAndProjectExec is Enabled 77 86 7 203.6 4.9 1.5X
Benchmark SPARK-39698 with max rows = 1000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 120 132 16 131.1 7.6 1.0X
TakeOrderedAndProjectExec is Enabled 80 89 10 195.6 5.1 1.5X
Benchmark SPARK-39698 with max rows = 10000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 139 151 12 113.1 8.8 1.0X
TakeOrderedAndProjectExec is Enabled 90 100 16 174.4 5.7 1.5X
Benchmark SPARK-39698 with max rows = 20000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 157 166 7 100.4 10.0 1.0X
TakeOrderedAndProjectExec is Enabled 96 104 9 163.2 6.1 1.6X
Benchmark SPARK-39698 with max rows = 50000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 203 211 8 77.4 12.9 1.0X
TakeOrderedAndProjectExec is Enabled 123 132 7 128.1 7.8 1.7X
Benchmark SPARK-39698 with max rows = 100000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 282 288 5 55.8 17.9 1.0X
TakeOrderedAndProjectExec is Enabled 164 173 8 95.8 10.4 1.7X
Benchmark SPARK-39698 with max rows = 500000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 748 753 6 21.0 47.5 1.0X
TakeOrderedAndProjectExec is Enabled 649 662 11 24.2 41.2 1.2X
Benchmark SPARK-39698 with max rows = 800000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 1109 1160 73 14.2 70.5 1.0X
TakeOrderedAndProjectExec is Enabled 985 1010 21 16.0 62.6 1.1X
Benchmark SPARK-39698 with max rows = 1000000: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled 1286 1329 37 12.2 81.8 1.0X
TakeOrderedAndProjectExec is Enabled 1314 1336 26 12.0 83.6 1.0X
cc @HyukjinKwon @cloud-fan
@kamcheungting-db
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!