[FEA] Optimize Shuffle coalesce performance when handling very small sized partitioned batches
Is your feature request related to a problem? Please describe.
Currently shuffle will partition the columnar batch into partitioned batches. https://github.com/NVIDIA/spark-rapids/blob/65ec25d85503484874d3b157d4e2775159187436/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala#L35-L47
And those partitioned batches will go through GpuColumnarBatchSerializer in GpuShuffleExchangeExec to serialize it.
https://github.com/NVIDIA/spark-rapids/blob/65ec25d85503484874d3b157d4e2775159187436/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala#L341-L344
https://github.com/NVIDIA/spark-rapids/blob/branch-24.04/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala#L170-L174
And the deserialization path, using shuffle hash join as an example, it will first go through the path: deserialization -> concat+load -> filter (e.g. null filtering masking or RequireSingleBatch) to meet CoalesceSizeGoal.
https://github.com/NVIDIA/spark-rapids/blob/65ec25d85503484874d3b157d4e2775159187436/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala#L213-L218
https://github.com/NVIDIA/spark-rapids/blob/65ec25d85503484874d3b157d4e2775159187436/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala#L295
If we had a large number of shuffle partition number and output batch number from the preceding operator, then each partitioned batch will be very small. For example, 1GB batch size with 2000 shuffle partition number, then 1GB/2000 = 500 KB for each serialized batch. We should introduce a coalescing shuffle writer to buffering those small batches.
Also if the shuffle write total size is not big enough, saying 30MB with 8 batches while we had 2000 partition number, shuffle write side will generate 8 * 2000 total batches to go through process above. And each batch is ~ 15.36 KB. From nsys traces below, there're clear bubbles for both shuffle write and read side.
The shuffle write does have some overlap with GPU computation but since those small sized batch serialization making a long tailing time after GPU finished all his work.
And the below shown that serialization on each batch will all write offset as well as data but with lots of overhead (the yellow box "serialize batch" besides the green box "write sliced"). And with lots of small batches, the overhead of serialization getting significant while the data write itself is not that much. We should figure out a way to improve this performance issue.
On the other hand in deserialization, the first stream batch was blocked until we met the build goal. And lots of time is spent in read head + read batch with lots of iterations.
Describe the solution you'd like There're two ideas in general could help this:
- Introduce a
write side coalescemechanism before flushing out those small batches. To achieve this goal, we need to maintain some buffer to threshold the batch serialization other than immediately flushing them out. - For serializer optimization, one possible option is about leveraging Arrow's serializer to see whether we can directly serialize batch. But also exist some extra work around encryption and compression work. https://github.com/apache/arrow/blob/e52017a72735d502c3ac3323d9d1fc61a15a6ae0/cpp/src/arrow/ipc/writer.cc#L1597
- Optimize shuffle coalesce performance by some of the CPU work onto GPU. Tracked in https://github.com/NVIDIA/spark-rapids/issues/10402.
Describe alternatives you've considered AQE may not help too much as we still need the optimize both shuffle write and read with those small batches with the same shuffle partition number for shuffle write.
The GPU coalesce on the read side is tracked by #10402.