spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39698][SQL] Use `TakeOrderedAndProject` if maxRows below the `spark.sql.execution.topKSortMaxRowsThreshold`

Open wangyum opened this issue 3 years ago • 2 comments

What changes were proposed in this pull request?

Spark will set maxRows at runtime: https://github.com/apache/spark/blob/bb6f65acca2918a0ceb13b612d210f1b46fa1add/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala#L58

This PR update SpecialLimits to use TakeOrderedAndProject if maxRows below the spark.sql.execution.topKSortMaxRowsThreshold. For example:

create table t1 using parquet as select id, cast(id as string) as value from range(10);
select * from (select distinct * from t1) order by id;

The query plan:

== Physical Plan ==
AdaptiveSparkPlan (14)
+- == Final Plan ==
   TakeOrderedAndProject (8)
   +- * HashAggregate (7)
      +- AQEShuffleRead (6)
         +- ShuffleQueryStage (5), Statistics(sizeInBytes=320.0 B, rowCount=10)
            +- Exchange (4)
               +- * HashAggregate (3)
                  +- * ColumnarToRow (2)
                     +- Scan parquet spark_catalog.default.t1 (1)
+- == Initial Plan ==
   Sort (13)
   +- Exchange (12)
      +- HashAggregate (11)
         +- Exchange (10)
            +- HashAggregate (9)
               +- Scan parquet spark_catalog.default.t1 (1)

Why are the changes needed?

Improve query performance by reducing shuffle.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test. Benchmark test code:

import org.apache.spark.benchmark.Benchmark
val numRows = 1024 * 1024 * 15
spark.sql(s"CREATE TABLE t1 using parquet AS select id as a, id as b, id as c, id as d, id as e from range(${numRows}L)")


Seq(10, 100, 1000, 10000, 20000, 50000, 100000, 500000, 800000, 1000000).foreach { maxRows =>
  val benchmark = new Benchmark(s"Benchmark SPARK-39698 with max rows = $maxRows", numRows, minNumIters = 5)
  Seq(false, true).foreach { enabled =>
    val threshold = if (enabled) maxRows else -1
    val name = s"TakeOrderedAndProjectExec is ${if (enabled) "Enabled" else "Disabled"}"
    benchmark.addCase(name) { _ =>
      withSQLConf("spark.sql.execution.topKSortMaxRowsThreshold" -> s"$threshold") {
        spark.sql(s"SELECT distinct * FROM t1 where a < $maxRows - 1 order by a").write.format("noop").mode("Overwrite").save()
      }
    }
  }
  benchmark.run()
}

Benchmark result:

Benchmark SPARK-39698 with max rows = 10:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled               156            203          38        100.8           9.9       1.0X
TakeOrderedAndProjectExec is Enabled                 92            100           8        171.1           5.8       1.7X

Benchmark SPARK-39698 with max rows = 100:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                117            129          11        134.8           7.4       1.0X
TakeOrderedAndProjectExec is Enabled                  77             86           7        203.6           4.9       1.5X

Benchmark SPARK-39698 with max rows = 1000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                 120            132          16        131.1           7.6       1.0X
TakeOrderedAndProjectExec is Enabled                   80             89          10        195.6           5.1       1.5X

Benchmark SPARK-39698 with max rows = 10000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                  139            151          12        113.1           8.8       1.0X
TakeOrderedAndProjectExec is Enabled                    90            100          16        174.4           5.7       1.5X

Benchmark SPARK-39698 with max rows = 20000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                  157            166           7        100.4          10.0       1.0X
TakeOrderedAndProjectExec is Enabled                    96            104           9        163.2           6.1       1.6X

Benchmark SPARK-39698 with max rows = 50000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                  203            211           8         77.4          12.9       1.0X
TakeOrderedAndProjectExec is Enabled                   123            132           7        128.1           7.8       1.7X

Benchmark SPARK-39698 with max rows = 100000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                   282            288           5         55.8          17.9       1.0X
TakeOrderedAndProjectExec is Enabled                    164            173           8         95.8          10.4       1.7X

Benchmark SPARK-39698 with max rows = 500000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                   748            753           6         21.0          47.5       1.0X
TakeOrderedAndProjectExec is Enabled                    649            662          11         24.2          41.2       1.2X

Benchmark SPARK-39698 with max rows = 800000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                  1109           1160          73         14.2          70.5       1.0X
TakeOrderedAndProjectExec is Enabled                    985           1010          21         16.0          62.6       1.1X

Benchmark SPARK-39698 with max rows = 1000000:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-----------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProjectExec is Disabled                   1286           1329          37         12.2          81.8       1.0X
TakeOrderedAndProjectExec is Enabled                    1314           1336          26         12.0          83.6       1.0X

wangyum avatar Jul 06 '22 15:07 wangyum

cc @HyukjinKwon @cloud-fan

wangyum avatar Jul 06 '22 15:07 wangyum

@kamcheungting-db

sigmod avatar Jul 07 '22 21:07 sigmod

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Nov 05 '22 00:11 github-actions[bot]