spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] Shuffle performance scales badly with number of columns and number of partitions

Open revans2 opened this issue 1 year ago • 1 comments

Describe the bug We have seen that the shuffle performance does not scale well. Looking at the code theoretically the worst case complexity is O(num_buffers * num_partitions). This is in the JCudfSerialization code, and really just around the way that Spark serializes out batches/rows of data. But we need to spend some more time to truly understand more deeply. I have spent some time trying to do this and even though my analysis is not complete I think it is clear enough that there is a problem that I decided to file an issue for it.

Steps/Code to reproduce bug Take a data table with lots of columns and try to shuffle it with a really large number of partitions (like 20,000). We see the performance of this shuffle be truly horrible. Especially compared to smaller numbers of shuffle partitions. I did a test where I generated 16 GiB of long value. The data was random values, but with a range of 0 to 10,000,000,000, and the number of rows depended on the number of columns. I did this for 1, 8, 64, 512, and 1024 columns. I also ran tests where I did a count for each of these columns. I did it with a single thread (I need to retest with more threads) I then did the same thing, but I inserted in a shuffle followed by a coalesce. The goal was to try and measure just the amount of time it took for the shuffle separate from the amount of time it took to read the parquet data and do the counts. In the first go at this the second stage got really horrible performance and it looked like it was related to the GPU getting lots and lots of really tiny batches. Because of that and with some help for @jlowe I changed how coalesce works to make it more efficient. The time needed to concat these batches is still non-trivial and is counted in the time for the shuffle.

https://github.com/NVIDIA/spark-rapids/pull/11126

The script I used to run the tests looks like.


def run_test(num_cols: Int, num_parts: Int, num_iters: Int): Unit = {
    (0 until num_iters).foreach { iter =>
      spark.time(println(spark.read.parquet(s"/data/tmp/SHUFFLE_SCALE/${num_cols}_*").repartition(num_parts, col("id_0")).coalesce(1).selectExpr((0 until num_cols).map(i => s"COUNT(id_$i)") :_*).collect().size))
    }
} 

Seq(1, 10, 100, 200, 201, 1000, 10000, 20000, 100000).foreach { num_parts =>
  Seq(1, 8, 64, 512, 1024).foreach { num_cols =>
    run_test(num_cols, num_parts, 3)
  }
}

The results produced the following...

Shuffle Times 1 10 200 201 1,000 10,000 20,000 100,000
1 85,568 84,799 80,243 67,541 67,745 77,077 94,404 545,825
8 86,087 85,180 78,498 67,318 67,158 78,400 97,338 556,409
64 85,139 83,259 78,011 66,264 67,743 84,263 110,955 608,564
512 86,674 86,971 81,794 68,702 75,104 146,210 225,374 1,143,235
1,024 86,116 86,297 83,618 70,798 83,868 210,154 359,376 1,511,276

Oddly we appear to do our best when we have 200 to 1000 shuffle partitions. Note that 200 vs 201 is there because that is the switch off point between two different spark algorithms for shuffle. This is using the spark default shuffle and does not include the rapids shuffle manager at all. For this our worst case is 22x slower than our best case for shuffling the same amount of data.

I plan on spending some more time to try and understand exactly what is happening here, and trying to understand if there is anything we can do to improve it.

There are a few things to note.

Because of the format that JCudfSerialization uses (I wrote it you can blame me for it) there is a self describing header for each batch of data. That header gets to be a larger and larger percentage of the data as the number of partitions increases. When there are few partitions it is very small. But as it gets larger the size can increase until is is larger than the actual data to be shuffled (101 GiB vs 16 GiB). The compression codecs appear to remove most of this extra size. But at the same time it is not something we want.

Another thing I noticed is that when doing some simple hprof profiling that the NVTX ranges show up as a significant part of the time spent. This is likely because we have ranges for each buffer output (which ends up being num partitions * num buffers in the number of ranges. This is not great. We also have some issues with us creating and destroying an object for each range. We probably also want to fix that. I still need to do more profiling and testing to see if disabling them is enough.

I personally think that the long term solution to this is to have a new format for shuffle. And possibly for most data movement to/from the GPU.

There are a few key things that we need for this format.

  1. It needs to NOT be self describing. We know what the schema is for the data and we don't need to include it in the data being shuffled.
  2. It needs to be something that we can process on the CPU as well as the GPU. We expect that there will be situations in which the CPU will be better than the GPU for processing this data (even if those are just because we have free CPU resources) so we need the ability to be able to process on either the CPU or the GPU.
  3. It need to be on the GPU so we can take advantage of the crazy parallelism. Inherently this problem is O(num_buffers * num_partitions). There is no way for us to slice the data and move it without slicing each buffer into num_partition pieces. The GPU is very good at ridiculously parallel operations so we need to explore using it for this problem.
  4. It needs to be super efficient at not just splitting the data but also concatenating the data back together again.
  5. We need to be sure that transferring the data to and from the GPU is efficient also. If each batch is only a few rows in size we don't want to do 100,000 small transfers to get a single batch worth of data. We probably want to be able to do it in a few transfers instead.

There are also a lot of nice to have features that might turn into required ones as we progress.

  1. It needs to be a little self describing. There are cases, like with variable length data and validity where some buffers are just not needed. We need good ways to be able to be self describing enough to say if the those buffer are included or not using as few bytes as possible
  2. It needs to be cascaded friendly. nvcomp has some really great data type specific compression called cascaded. It would be really nice if we could use some of these to help reduce the size of the data better than a general purpose codec can. But it probably only would be worth it if we are shuffling large buffers of data. When we get closer to just having a few rows in a buffer we probably would need a way to disable this.
  3. It needs to be very compact. It might be nice to explore using variable bit with formats for any metadata that we do need to include. We might also want to look at not worrying about alignment. But we would have to see what the performance costs are for that as the GPU and CPU can run into problems if things are not aligned.

We also need to do a lot of testing so that we know when to do what. The tests I have done so far are just for longs. we need to explore variable length data as I expect it to have a larger impact on this.

We also need to look at other parts of the processing. We have done some optimized versions of shuffle for smaller numbers of hash partitions. Are there things we can/should improve there? How do things change when we are working with lots of threads, not just 1.

revans2 avatar Jul 02 '24 14:07 revans2

As a side note I was able to crash a spark query locally by trying to partition 512 columns into 100,000 partitions. It looks like the write produces so much garbage that GC goes crazy and it times out. I am not 100% sure that it is GC, but it looks like it is.

revans2 avatar Jul 03 '24 15:07 revans2

Any reason to keep this open, now that kud0 is in place?

nvdbaranec avatar Nov 06 '25 16:11 nvdbaranec