RemoteShuffleService
RemoteShuffleService copied to clipboard
How long the shuffle data of each ShuffleStage will be stored in RSS nodes?
The char shows the shuffle stages of an application
After Stage-1
ShuffleWriting 349GB data, next 2 shuffle stages have no ShuffleWrite data,
is the 349GB data will be cleaned when stage-6
starts ShuffleWriting or already be cleaned when stage-2
starts shuffleWriting though it has no data to write?
stage-897
is the final stage and stage-896
has the latest ShuffleWrite with about 501GB
.So the 501GB will be kept for 36 hours according to DEFAULT_APP_FILE_RETENTION_MILLIS
(default 36h)?
StageId | Input (GB) | Output (GB) | ShuffleRead (GB) | ShuffleWrite (GB) |
---|---|---|---|---|
0 | 503.59 | 0.0 | 0.0 | 1723.94 |
1 | 0.0 | 0.0 | 1721.11 | 349.34 |
2 | 0.0 | 0.0 | 349.05 | 0.0 |
5 | 0.0 | 0.0 | 349.05 | 0.0 |
6 | 575.31 | 0.0 | 0.0 | 3045.01 |
... | ... | ... | ... | ... |
889 | 2439.36 | 0.0 | 269.53 | 1885.78 |
896 | 0.0 | 0.0 | 922.01 | 501.73 |
897 | 0.0 | 2900.98 | 2386.41 | 0.0 |
Hi @Lobo2008, if I remember correctly, yes, the 501GB will be kept for 36 hours according to DEFAULT_APP_FILE_RETENTION_MILLIS(default 36h).
The reason is Spark application needs shuffle files from previous stage for stage level retry, e.g. Spark could read shuffle data from previous stage and continue running.
There could be optimization if people do not need to keep old stage shuffle files. I did not get time on this feature when I worked in Uber. Feel free to contribute this part!
Hi @hiboyang , how about the 1885GB
of stage-889
?
I suppose when stage-896
is still running or have some task failed or the whole stage failed and need to retry , it needs previous data of stage-889
. but when 896
is finished, its previous 1885GB
is useless , so I suppose the 1885GB
will be cleaned and only the 501GB will be kept?
Hi @Lobo2008, you are right. It could track the stage dependency and clean up stage shuffle files selectively. Need someone to work on this :)
Hi @Lobo2008, you are right. It could track the stage dependency and clean up stage shuffle files selectively. Need someone to work on this :)
Thanks for the reply! One more question:
- 2 nodes, each node is mounted with 3 disks, each disk is 5TB.
- Run 6 StreamServer and each launches with
-rootDir
pointing to one disk.
So at a specific moment the summation shuffle data of all apps cannot exceed 5TB
, not 5*6=30TB
?
RSS cannot use multiple disks so far, since it can only be configured using one directory. Again, this part could be changed as well with contribution welcome.
If disk is 5TB, it means all partitions on that disk cannot exceed 5TB. Let's say you have one application evenly distributed on on 2 nodes, the application max shuffle bytes will be 5*2=10TB.
RSS cannot use multiple disks so far, since it can only be configured using one directory. Again, this part could be changed as well with contribution welcome.
If disk is 5TB, it means all partitions on that disk cannot exceed 5TB. Let's say you have one application evenly distributed on on 2 nodes, the application max shuffle bytes will be 5*2=10TB.
RSS cannot use multiple disks so far
,Yes, so I simply resolve the limitation by running several StreamServers on the same node and each StreamServer process do only use a disk.
I almost understand now, data of one app will be roughly evenly distributed on all RSS so its upper limitation > 5TB. Thanks for your reply!