cudf icon indicating copy to clipboard operation
cudf copied to clipboard

Optimizations for tdigest generation.

Open nvdbaranec opened this issue 7 months ago • 1 comments

This PR implements optimizations for speeding up a specific part of the tdigest generation code. Tdigest generation happens in two parts:

  • Based on the input size and the user requested fidelity, a set of segments that represent "buckets" of input values to be merged together into centroids is generated.
  • The input values are then merged together via a segmented reduction into these buckets.

The issue lies with the first step. The process of choosing the buckets is inherently linear per input group. It uses a nonlinear 'scale function' to determine where the next bucket cutoff point should be based on where you currently are. The current implementation uses 1 thread per input group. The use case here involves only 1 group, with a large number of values. Broadly, ~5,000,000 input values, using a max of 10,000 segments, resulting in ~5000 actual segments being generated. The computation of those 5000 segments is the `issue.

As implemented today, it takes roughly 6.5ms to generate that on an A5000 GPU.

There were 3 optimizations pursued:

  • First optimization: Use the CPU to do this particular step for small numbers of groups. The CPU is considerably faster here per group. A single core of a Threadripper taking roughly 250us to do the same work. This optimization works as follows: If we have up to 64 groups of work, run 32 of them on the CPU and the remainder on the GPU. 32 is the rough break-even point where they take about the same amount of time. Above 64 groups, run it all on the GPU because the inherent parallelism starts to dominate, and we don't have to do a device->pinned copy stop to make the data available to the CPU. Since our specific customer case involves one group, this drops the processing time for the section from 6.5ms to 230us. Below we can see the case (highlighted in green) where the CPU and GPU are overlapping the same amount of work (32 groups CPU and 32 groups GPU).

image

  • Second optimization. As with many GPU algorithms, we have to run this computation twice. Once to determine the size the outputs, and once to actually fill them in. So we're doubling our cost. However, we can make some reasonable assumptions about how big the worst case size will be. It is tied to the cluster limit (10,000 specified by the user). So instead of exactly computing the number of clusters per group in a separate pass, we can just allocate the worst case and do the whole process in one. This number can in theory be large though. For example, 2000 groups * 10000 clusters * 8 bytes : 160MB. So I chose a somewhat arbitrary cap of 256 MB. If it is going to require more than that, we just do the two passes. It might be worth making this configurable via en environment variable.

  • Third optimization. A big part of the slowness is related to using doubles with some trig functions (sin and asin). Switching to floats brings the time down by about 40%. However, it caused some accuracy errors that I couldn't easily fix by just adjusting our error tolerance without getting unreasonable (in ulps). But, an easy reformulation of the math was pointed out to me which allows us to remove the sin and asin altogether, instead just costing a (double) sqrt per call. This results in about a 50% speedup for the function.

I added a benchmark specific to tdigest reductions (we needed it anyway), with emphasis on this specific case: small numbers of large groups. Speedups over the existing code are significant:

Old:

num_groups rows_per_group max_centroids Samples CPU Time Noise GPU Time Noise
1 5000000 10000 37x 13.609 ms 0.02% 13.604 ms 0.02%
16 5000000 10000 20x 25.281 ms 0.03% 25.276 ms 0.03%
64 5000000 10000 11x 63.460 ms 0.03% 63.455 ms 0.03%
1 1000000 10000 39x 13.018 ms 0.03% 13.013 ms 0.03%
16 1000000 10000 34x 15.139 ms 0.04% 15.134 ms 0.04%
64 1000000 10000 23x 22.695 ms 0.04% 22.690 ms 0.04%
1 5000000 1000 233x 2.158 ms 0.09% 2.154 ms 0.09%
16 5000000 1000 36x 14.238 ms 0.03% 14.233 ms 0.03%
64 5000000 1000 11x 52.956 ms 0.01% 52.950 ms 0.01%
1 1000000 1000 335x 1.500 ms 0.46% 1.496 ms 0.13%
16 1000000 1000 133x 3.791 ms 0.09% 3.786 ms 0.09%
64 1000000 1000 45x 11.170 ms 0.04% 11.165 ms 0.04%

New:

num_groups rows_per_group max_centroids Samples CPU Time Noise GPU Time Noise
1 5000000 10000 464x 1.083 ms 2.64% 1.079 ms 2.65%
16 5000000 10000 560x 15.280 ms 1.35% 15.276 ms 1.35%
64 5000000 10000 11x 58.353 ms 0.12% 58.349 ms 0.12%
1 1000000 10000 1392x 365.365 us 1.09% 361.544 us 1.11%
16 1000000 10000 122x 4.120 ms 0.44% 4.117 ms 0.44%
64 1000000 10000 37x 13.577 ms 0.09% 13.573 ms 0.09%
1 5000000 1000 528x 1.014 ms 5.87% 1.010 ms 5.90%
16 5000000 1000 832x 14.139 ms 5.03% 14.135 ms 5.03%
64 5000000 1000 11x 54.931 ms 0.01% 54.926 ms 0.01%
1 1000000 1000 2960x 296.354 us 2.36% 292.453 us 2.25%
16 1000000 1000 560x 2.929 ms 3.19% 2.925 ms 3.20%
64 1000000 1000 45x 11.134 ms 0.11% 11.130 ms 0.11%

This yields as much as a 10x speedup. You can see where the time starts to flatten out at 64 groups, where the parallelism of the GPU starts to win.

Existing (merge aggregation) benchmarks saw some wins too

Old

num_tdigests tdigest_size tdigests_per_group max_centroids Samples CPU Time Noise GPU Time Noise Elem/s
500000 1 1 10000 1072x 472.379 us 3.17% 467.688 us 3.19% 1.069G
500000 1000 1 10000 11x 783.576 ms 0.17% 783.563 ms 0.17% 638.110M
500000 1 1 1000 2528x 463.077 us 0.73% 458.498 us 0.74% 1.091G
500000 1000 1 1000 11x 540.151 ms 0.13% 540.140 ms 0.13% 925.686M

New

num_tdigests tdigest_size tdigests_per_group max_centroids Samples CPU Time Noise GPU Time Noise Elem/s
500000 1 1 10000 1248x 404.618 us 2.50% 400.754 us 2.52% 1.248G
500000 1000 1 10000 11x 765.092 ms 0.12% 765.079 ms 0.12% 653.527M
500000 1 1 1000 2208x 399.257 us 0.92% 395.360 us 0.93% 1.265G
500000 1000 1 1000 11x 532.907 ms 0.24% 532.895 ms 0.24% 938.271M

Leaving as a draft for now to get some specific Spark testing done with it.

nvdbaranec avatar Jun 11 '25 21:06 nvdbaranec

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

copy-pr-bot[bot] avatar Jun 11 '25 21:06 copy-pr-bot[bot]

The changes look good to me. I ran some tests.

val dfs = (0 until 9).map(i => math.pow(10, i).toLong).map{ rows => 
 spark.range(rows).selectExpr(s"rand($rows) as rn").selectExpr("percentile(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as p", "percentile_approx(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as pa").selectExpr("p", "pa", "abs(pa[0] - p[0]) as e0", "abs(pa[1] - p[1]) as e1", "abs(pa[2] - p[2]) as e2", "abs(pa[3] - p[3]) as e3", "abs(pa[4] - p[4]) as e4", "abs(pa[5] - p[5]) as e5", "abs(pa[6] - p[6]) as e6", "abs(pa[7] - p[7]) as e7", "abs(pa[8] - p[8]) as e8","abs(pa[9] - p[9]) as e9","abs(pa[10] - p[10]) as e10", s"$rows as rc")
}
spark.conf.set("spark.rapids.sql.enabled",false)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/CPU_NEW_RESULT")
spark.conf.set("spark.rapids.sql.enabled",true)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/GPU_NEW_RESULT")

val cpu = spark.read.parquet("/data/tmp/CPU_NEW_RESULT")
val gpu = spark.read.parquet("/data/tmp/GPU_NEW_RESULT")

cpu.alias("cpu").join(gpu.alias("gpu"), cpu("rc") === gpu("rc")).select(col("cpu.rc"), (col("cpu.e0") >= col("gpu.e0")).alias("e0"), (col("cpu.e1") >= col("gpu.e1")).alias("e1"), (col("cpu.e2") >= col("gpu.e2")).alias("e2"), (col("cpu.e3") >= col("gpu.e3")).alias("e3"), (col("cpu.e4") >= col("gpu.e4")).alias("e4"), (col("cpu.e5") >= col("gpu.e5")).alias("e5"), (col("cpu.e6") >= col("gpu.e6")).alias("e6"), (col("cpu.e7") >= col("gpu.e7")).alias("e7"), (col("cpu.e8") >= col("gpu.e8")).alias("e8"), (col("cpu.e9") >= col("gpu.e9")).alias("e9"), (col("cpu.e10") >= col("gpu.e10")).alias("e10")).show()

Effectively I am calculating the difference between the actual percentile and the approximate version for both the CPU and the GPU, so that we can see if the GPU is a better, or worse estimate than the CPU is. For all of the tests that I ran the GPU is as good as or better than the CPU both before this change and after it.

+---------+----+----+----+----+----+----+----+----+----+-----+----+
|       rc|  e0|  e1|  e2|  e3|  e4|  e5|  e6|  e7|  e8|   e9| e10|
+---------+----+----+----+----+----+----+----+----+----+-----+----+
|        1|true|true|true|true|true|true|true|true|true| true|true|
|       10|true|true|true|true|true|true|true|true|true| true|true|
|      100|true|true|true|true|true|true|true|true|true| true|true|
|     1000|true|true|true|true|true|true|true|true|true| true|true|
|    10000|true|true|true|true|true|true|true|true|true| true|true|
|   100000|true|true|true|true|true|true|true|true|true| true|true|
|  1000000|true|true|true|true|true|true|true|true|true| true|true|
| 10000000|true|true|true|true|true|true|true|true|true| true|true|
|100000000|true|true|true|true|true|true|true|true|true|false|true|
+---------+----+----+----+----+----+----+----+----+----+-----+----+

That said if I compare the new GPU calculation to the old GPU calculation we differ only on the 100,000,000 rows version. In one case we are slightly better, e0, and it all of the other cases we are slightly worse. But it still beats the CPU for accuracy, so it is good for me.

revans2 avatar Jun 17 '25 19:06 revans2

Instead of showing old vs new benchmark, can you run the comparing script please? Using https://github.com/NVIDIA/nvbench/blob/main/scripts/nvbench_compare.py.

ttnghia avatar Jun 17 '25 20:06 ttnghia

/merge

nvdbaranec avatar Jul 02 '25 00:07 nvdbaranec