incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[FEATURE] support generating larger block size during shuffle map task by spill partial partitions data

Open leslizhang opened this issue 2 years ago • 3 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the feature

At least three situations can trigger a large number of small blocks in shuffle map task:

  1. Data skew: When the data processed by the map task mostly belongs to a few reduce tasks, the sum of the data for the few reduce tasks exceeds the spill threshold, causing the remaining long-tail reduce tasks to spill together passively.
  2. The number of reduce tasks is extremely large (>10k): When each reduce partition has very little data, it will also cause the buffer of the ShuffleBufferManager of the shuffle map task to be occupied beyond the spill threshold, thereby producing a large number of small blocks.
  3. When an executor can run multiple tasks at the same time, there is memory competition between tasks. At this time, some tasks may not get enough memory because they start later than other tasks, but the overall memory of the executor is insufficient to cause ShuffleBufferManager to spill as a MemoryConsumer. In fact, ShuffleBufferManager may only store a small amount of data for each reduce partition.

Problems caused by small blocks:

Unnecessary network overhead: The amount of data sent in a single transmission is reduced, increasing the number of network interactions between the executor and the shuffle server. The number of interactions can expand up to 10 times or even 100 times. Increase the size of the index file when the shuffle server stores shuffle data persistently.

Motivation

No response

Describe the solution

when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space. so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.

Additional context

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

leslizhang avatar Mar 20 '24 14:03 leslizhang

+1.

zuston avatar Mar 22 '24 01:03 zuston

Sorting the partition data by the reduce partition id should maximize block sizes. Is that an option?

EnricoMi avatar Mar 22 '24 15:03 EnricoMi

With this feature, the buffer size of a single partition(spark.rss.writer.buffer.size) can be increased to a larger value, for example, from the current default configuration of 3MB to 10MB or more.

leslizhang avatar Apr 28 '24 07:04 leslizhang