spark-rapids
spark-rapids copied to clipboard
repartition-based fallback for hash aggregate
this PR closes https://github.com/NVIDIA/spark-rapids/issues/8391.
this PR add a config called spark.rapids.sql.agg.fallbackAlgorithm to let user decide a sort-based algorithm or repartition-based algorithm to use when agg cannot be done in a single pass in memory.
This optimization is orthogonal to https://github.com/NVIDIA/spark-rapids/pull/10950
build
build
build
build
build
build
build
build
build
Can we please get a performance comparison for this change?
build
I think there may be something wrong with your metrics for the repartition case. If I run.
spark.conf.set("spark.sql.shuffle.partitions", 2)
spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false)
spark.time(spark.range(0, 3000000000L, 1, 2).selectExpr("CAST(rand(0) * 3000000000 AS LONG) DIV 2 as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())
with repartition then the metrics for aggregations are all very large compared to running it for sort. But the total run time is actually smaller.
Hi @revans2 , do you mean the op time metrics? I did some investigation, and found that for sort-based fallback, op time can be very inaccurate in terms of failing to capture many spill times. E.g. if a spill is triggered by https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L969, the time for the spill is not counted to op time. If we take a look at NSYS, we can see many NVTX ranges named "device memory sync spill". These ranges do not hava a parent NVTX and seem not captured by op time metrics.
On the other side, op time can be inaccurate for repartition-based fallback as well. (But may not miss as many ranges as sort-based do). Actually, the inaccurancy is rooted in the way we measure op time. Do you think we need to refine how op time is measured, so that we can make sure the sum of all operators' op time is equals to wall time?
I think there may be something wrong with your metrics for the repartition case. If I run.
spark.conf.set("spark.sql.shuffle.partitions", 2) spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false) spark.time(spark.range(0, 3000000000L, 1, 2).selectExpr("CAST(rand(0) * 3000000000 AS LONG) DIV 2 as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())with repartition then the metrics for aggregations are all very large compared to running it for sort. But the total run time is actually smaller.
I also found that, for this synthetic case, sort-based fallback beats repartition-based fallback on my PC (it's about 6.2 min vs. 6.6 min), with following configs:
bin/spark-shell \
--master 'local[10]' \
--driver-memory 20g \
--conf spark.rapids.memory.pinnedPool.size=20G \
--conf spark.sql.files.maxPartitionBytes=2g \
--conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.metrics.level='DEBUG' \
--conf spark.rapids.sql.agg.fallbackAlgorithm='repartition' \
--conf spark.eventLog.enabled=true \
--jars /home/hongbin/code/spark-rapids2/dist/target/rapids-4-spark_2.12-24.08.0-SNAPSHOT-cuda11.jar
I also compared the repartition-based fallbacks over the sort-based fallback on NDS, and found that despite the total duration has a little improvement, we CANNOT garantee repartition-based fallbacks always wins. I haven't found a simple rule/heusitic to decided when to use repartition-based and when to use the other, so it would be difficult for us to explain which is better to the users.
For now, I would suggest users to try repartition-based fallback if a lot of buffer spills are observed. However it's still not a rule of thumb because a lot spill also occurred in your synthetic case (where repartition-based fallback is slower).
Any thoughts?
Any thoughts?
I think we need to do some profiling of cases when the partition based code is worse than the sort based code to understand what is happening. Ideally we get it down to something like a micro-benchmark so we can better isolate it when doing profiling. I have a few ideas about what it could be, but this is just speculation.
- Sort of a single numeric field can be very fast. It might be fast enough to beat the partitioning code for the same path.
- The partitioning code might have a bug in it where it ends up doing extra work, or there are some kernels that are not as optimized as the sort case.
- Spilling/repartitioning/sorting has a high enough run to run variance that we see it lose some of the time but overall it is a win.
@binmahone If you get some profiling info I am happy to look into it with you.
Per our offline discussion with @revans2 and @jlowe
Even though current repartition-based fallback has already showcased a significant win over sort-based in our customer query, we need to : 1. further compare repartition-based vs. sort-based on NDS, and check in what situation sort-based will surpass repartition-based (i.e. regression), and if the regression is acceptable. 2. try some more radical improvement for repartition-based, e.g. skip the first pass of aggregation entirely.
With above done, we may able to rip out the sort-based code entirely, and check in this PR.
Suggest to move this PR from 2408 to 2410 to allow above items being done. @GaryShen2008
Please retarget to 24.10
Please retarget to 24.10
got. Meanwhile I'm still refactoring this PR to see if there's more potentials