Optimizations for tdigest generation.
This PR implements optimizations for speeding up a specific part of the tdigest generation code. Tdigest generation happens in two parts:
- Based on the input size and the user requested fidelity, a set of segments that represent "buckets" of input values to be merged together into centroids is generated.
- The input values are then merged together via a segmented reduction into these buckets.
The issue lies with the first step. The process of choosing the buckets is inherently linear per input group. It uses a nonlinear 'scale function' to determine where the next bucket cutoff point should be based on where you currently are. The current implementation uses 1 thread per input group. The use case here involves only 1 group, with a large number of values. Broadly, ~5,000,000 input values, using a max of 10,000 segments, resulting in ~5000 actual segments being generated. The computation of those 5000 segments is the `issue.
As implemented today, it takes roughly 6.5ms to generate that on an A5000 GPU.
There were 3 optimizations pursued:
- First optimization: Use the CPU to do this particular step for small numbers of groups. The CPU is considerably faster here per group. A single core of a Threadripper taking roughly
250usto do the same work. This optimization works as follows: If we have up to 64 groups of work, run 32 of them on the CPU and the remainder on the GPU. 32 is the rough break-even point where they take about the same amount of time. Above 64 groups, run it all on the GPU because the inherent parallelism starts to dominate, and we don't have to do a device->pinned copy stop to make the data available to the CPU. Since our specific customer case involves one group, this drops the processing time for the section from6.5msto230us. Below we can see the case (highlighted in green) where the CPU and GPU are overlapping the same amount of work (32 groups CPU and 32 groups GPU).
-
Second optimization. As with many GPU algorithms, we have to run this computation twice. Once to determine the size the outputs, and once to actually fill them in. So we're doubling our cost. However, we can make some reasonable assumptions about how big the worst case size will be. It is tied to the cluster limit (
10,000specified by the user). So instead of exactly computing the number of clusters per group in a separate pass, we can just allocate the worst case and do the whole process in one. This number can in theory be large though. For example, 2000 groups * 10000 clusters * 8 bytes : 160MB. So I chose a somewhat arbitrary cap of256 MB. If it is going to require more than that, we just do the two passes. It might be worth making this configurable via en environment variable. -
Third optimization. A big part of the slowness is related to using doubles with some trig functions (sin and asin). Switching to floats brings the time down by about 40%. However, it caused some accuracy errors that I couldn't easily fix by just adjusting our error tolerance without getting unreasonable (in
ulps). But, an easy reformulation of the math was pointed out to me which allows us to remove the sin and asin altogether, instead just costing a (double) sqrt per call. This results in about a 50% speedup for the function.
I added a benchmark specific to tdigest reductions (we needed it anyway), with emphasis on this specific case: small numbers of large groups. Speedups over the existing code are significant:
Old:
| num_groups | rows_per_group | max_centroids | Samples | CPU Time | Noise | GPU Time | Noise |
|---|---|---|---|---|---|---|---|
| 1 | 5000000 | 10000 | 37x | 13.609 ms | 0.02% | 13.604 ms | 0.02% |
| 16 | 5000000 | 10000 | 20x | 25.281 ms | 0.03% | 25.276 ms | 0.03% |
| 64 | 5000000 | 10000 | 11x | 63.460 ms | 0.03% | 63.455 ms | 0.03% |
| 1 | 1000000 | 10000 | 39x | 13.018 ms | 0.03% | 13.013 ms | 0.03% |
| 16 | 1000000 | 10000 | 34x | 15.139 ms | 0.04% | 15.134 ms | 0.04% |
| 64 | 1000000 | 10000 | 23x | 22.695 ms | 0.04% | 22.690 ms | 0.04% |
| 1 | 5000000 | 1000 | 233x | 2.158 ms | 0.09% | 2.154 ms | 0.09% |
| 16 | 5000000 | 1000 | 36x | 14.238 ms | 0.03% | 14.233 ms | 0.03% |
| 64 | 5000000 | 1000 | 11x | 52.956 ms | 0.01% | 52.950 ms | 0.01% |
| 1 | 1000000 | 1000 | 335x | 1.500 ms | 0.46% | 1.496 ms | 0.13% |
| 16 | 1000000 | 1000 | 133x | 3.791 ms | 0.09% | 3.786 ms | 0.09% |
| 64 | 1000000 | 1000 | 45x | 11.170 ms | 0.04% | 11.165 ms | 0.04% |
New:
| num_groups | rows_per_group | max_centroids | Samples | CPU Time | Noise | GPU Time | Noise |
|---|---|---|---|---|---|---|---|
| 1 | 5000000 | 10000 | 464x | 1.083 ms | 2.64% | 1.079 ms | 2.65% |
| 16 | 5000000 | 10000 | 560x | 15.280 ms | 1.35% | 15.276 ms | 1.35% |
| 64 | 5000000 | 10000 | 11x | 58.353 ms | 0.12% | 58.349 ms | 0.12% |
| 1 | 1000000 | 10000 | 1392x | 365.365 us | 1.09% | 361.544 us | 1.11% |
| 16 | 1000000 | 10000 | 122x | 4.120 ms | 0.44% | 4.117 ms | 0.44% |
| 64 | 1000000 | 10000 | 37x | 13.577 ms | 0.09% | 13.573 ms | 0.09% |
| 1 | 5000000 | 1000 | 528x | 1.014 ms | 5.87% | 1.010 ms | 5.90% |
| 16 | 5000000 | 1000 | 832x | 14.139 ms | 5.03% | 14.135 ms | 5.03% |
| 64 | 5000000 | 1000 | 11x | 54.931 ms | 0.01% | 54.926 ms | 0.01% |
| 1 | 1000000 | 1000 | 2960x | 296.354 us | 2.36% | 292.453 us | 2.25% |
| 16 | 1000000 | 1000 | 560x | 2.929 ms | 3.19% | 2.925 ms | 3.20% |
| 64 | 1000000 | 1000 | 45x | 11.134 ms | 0.11% | 11.130 ms | 0.11% |
This yields as much as a 10x speedup. You can see where the time starts to flatten out at 64 groups, where the parallelism of the GPU starts to win.
Existing (merge aggregation) benchmarks saw some wins too
Old
| num_tdigests | tdigest_size | tdigests_per_group | max_centroids | Samples | CPU Time | Noise | GPU Time | Noise | Elem/s |
|---|---|---|---|---|---|---|---|---|---|
| 500000 | 1 | 1 | 10000 | 1072x | 472.379 us | 3.17% | 467.688 us | 3.19% | 1.069G |
| 500000 | 1000 | 1 | 10000 | 11x | 783.576 ms | 0.17% | 783.563 ms | 0.17% | 638.110M |
| 500000 | 1 | 1 | 1000 | 2528x | 463.077 us | 0.73% | 458.498 us | 0.74% | 1.091G |
| 500000 | 1000 | 1 | 1000 | 11x | 540.151 ms | 0.13% | 540.140 ms | 0.13% | 925.686M |
New
| num_tdigests | tdigest_size | tdigests_per_group | max_centroids | Samples | CPU Time | Noise | GPU Time | Noise | Elem/s |
|---|---|---|---|---|---|---|---|---|---|
| 500000 | 1 | 1 | 10000 | 1248x | 404.618 us | 2.50% | 400.754 us | 2.52% | 1.248G |
| 500000 | 1000 | 1 | 10000 | 11x | 765.092 ms | 0.12% | 765.079 ms | 0.12% | 653.527M |
| 500000 | 1 | 1 | 1000 | 2208x | 399.257 us | 0.92% | 395.360 us | 0.93% | 1.265G |
| 500000 | 1000 | 1 | 1000 | 11x | 532.907 ms | 0.24% | 532.895 ms | 0.24% | 938.271M |
Leaving as a draft for now to get some specific Spark testing done with it.
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.
Contributors can view more details about this message here.
The changes look good to me. I ran some tests.
val dfs = (0 until 9).map(i => math.pow(10, i).toLong).map{ rows =>
spark.range(rows).selectExpr(s"rand($rows) as rn").selectExpr("percentile(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as p", "percentile_approx(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as pa").selectExpr("p", "pa", "abs(pa[0] - p[0]) as e0", "abs(pa[1] - p[1]) as e1", "abs(pa[2] - p[2]) as e2", "abs(pa[3] - p[3]) as e3", "abs(pa[4] - p[4]) as e4", "abs(pa[5] - p[5]) as e5", "abs(pa[6] - p[6]) as e6", "abs(pa[7] - p[7]) as e7", "abs(pa[8] - p[8]) as e8","abs(pa[9] - p[9]) as e9","abs(pa[10] - p[10]) as e10", s"$rows as rc")
}
spark.conf.set("spark.rapids.sql.enabled",false)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/CPU_NEW_RESULT")
spark.conf.set("spark.rapids.sql.enabled",true)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/GPU_NEW_RESULT")
val cpu = spark.read.parquet("/data/tmp/CPU_NEW_RESULT")
val gpu = spark.read.parquet("/data/tmp/GPU_NEW_RESULT")
cpu.alias("cpu").join(gpu.alias("gpu"), cpu("rc") === gpu("rc")).select(col("cpu.rc"), (col("cpu.e0") >= col("gpu.e0")).alias("e0"), (col("cpu.e1") >= col("gpu.e1")).alias("e1"), (col("cpu.e2") >= col("gpu.e2")).alias("e2"), (col("cpu.e3") >= col("gpu.e3")).alias("e3"), (col("cpu.e4") >= col("gpu.e4")).alias("e4"), (col("cpu.e5") >= col("gpu.e5")).alias("e5"), (col("cpu.e6") >= col("gpu.e6")).alias("e6"), (col("cpu.e7") >= col("gpu.e7")).alias("e7"), (col("cpu.e8") >= col("gpu.e8")).alias("e8"), (col("cpu.e9") >= col("gpu.e9")).alias("e9"), (col("cpu.e10") >= col("gpu.e10")).alias("e10")).show()
Effectively I am calculating the difference between the actual percentile and the approximate version for both the CPU and the GPU, so that we can see if the GPU is a better, or worse estimate than the CPU is. For all of the tests that I ran the GPU is as good as or better than the CPU both before this change and after it.
+---------+----+----+----+----+----+----+----+----+----+-----+----+
| rc| e0| e1| e2| e3| e4| e5| e6| e7| e8| e9| e10|
+---------+----+----+----+----+----+----+----+----+----+-----+----+
| 1|true|true|true|true|true|true|true|true|true| true|true|
| 10|true|true|true|true|true|true|true|true|true| true|true|
| 100|true|true|true|true|true|true|true|true|true| true|true|
| 1000|true|true|true|true|true|true|true|true|true| true|true|
| 10000|true|true|true|true|true|true|true|true|true| true|true|
| 100000|true|true|true|true|true|true|true|true|true| true|true|
| 1000000|true|true|true|true|true|true|true|true|true| true|true|
| 10000000|true|true|true|true|true|true|true|true|true| true|true|
|100000000|true|true|true|true|true|true|true|true|true|false|true|
+---------+----+----+----+----+----+----+----+----+----+-----+----+
That said if I compare the new GPU calculation to the old GPU calculation we differ only on the 100,000,000 rows version. In one case we are slightly better, e0, and it all of the other cases we are slightly worse. But it still beats the CPU for accuracy, so it is good for me.
Instead of showing old vs new benchmark, can you run the comparing script please? Using https://github.com/NVIDIA/nvbench/blob/main/scripts/nvbench_compare.py.
/merge