spark
spark copied to clipboard
[WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished
What changes were proposed in this pull request?
Cleanup merged shuffle data files after query finished.
Why are the changes needed?
There will be too many merged shuffle data files for long running spark applications.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Local cluster test.
+CC @akpatnam25, @otterc - since you were planning to work on this already.
Hi, @mridulm Thanks for your review.
In my opinion, I do not think we should add ShuffleMergeId in RemoveShuffleMerge.
The push-based shuffle service will only keep the latest shuffleMergeId data for each shuffle, and RemoveShuffleMerge will cleanup its data.
Just like cleaning up normal shuffle data written by mapTasks, these methods only need shuffleId: def removeShuffle(shuffleId: Int)
What do you think?
We should decouple current implementation details when making protocol changes, and make it extensible for future evolution.
In this case though, it is much more straightforward - there is an existing usecase which requires shuffle merge id.
When retrying an indeterminate stage, we should cleanup merged shuffle data for previous stage attempt (in submitMissingTasks, before unregisterAllMapAndMergeOutput) - and given the potential race conditions there, we dont want RemoveShuffleMerge to clean up for the next attempt (when we add support for this).
This specific change can be done in a follow up PR though - I want to get the basic mechanics working in this PR, and ensure the cleanup usecase is handled - before looking at further enhancements.
We should decouple current implementation details when making protocol changes, and make it extensible for future evolution.
In this case though, it is much more straightforward - there is an existing usecase which requires shuffle merge id. When retrying an indeterminate stage, we should cleanup merged shuffle data for previous stage attempt (in
submitMissingTasks, beforeunregisterAllMapAndMergeOutput) - and given the potential race conditions there, we dont wantRemoveShuffleMergeto clean up for the next attempt (when we add support for this).This specific change can be done in a follow up PR though - I want to get the basic mechanics working in this PR, and ensure the cleanup usecase is handled - before looking at further enhancements.
The push-based shuffle service will auto clean up the old shuffle merge data, and the following stage will read the new stage's output, so we don't need send RemoveShuffleMerge RPC for a new ShuffleMerge? The only scenario I can think of now where a cleanup RPC needs is the spark job completes. Could we think of other scenarios?
retest this please
The push-based shuffle service will auto clean up the old shuffle merge data
Consider the case I mentioned above - stage retry for an INDETERMINATE stage.
We cleanup previous attempts only if it happens to use the same merger in new attempt as well.
The previous attempt's mergers are not reused for the next attempt - and so the previous mergers will continue to hold stale data without cleaning them up - until application terminates.
Note - any merger which happens to be reused in new stage attempt will cleanup - I am referring to those which are not used: old attempt mergerLocs -- new attempts mergerLocs.
Can one of the admins verify this patch?
Thanks @mridulm , I will the the shuffleMergeId into the protocol later.
This PR also has some other issues, I will fix it soon.
Gentle ping @wankunde. Do you think you can update the PR soon? Please let us know if you need any help.
Gentle ping @wankunde. Do you think you can update the PR soon? Please let us know if you need any help.
Sorry for the late reply, I will update this PR today.
Hi, @otterc @mridulm If the spark application stops unexpectedly, there will be some leaked merge files. In our production cluster, yarn nodeManager will cleanup these leaked files if the disk is not full, but not if the disk is full.
Could we change the blockHandler.applicationRemoved(appId, false /* clean up local dirs */) to blockHandler.applicationRemoved(appId, true) in YarnShuffleService to clean up them ?
https://github.com/apache/spark/blob/e6699570becadb91695572bca5adc1605dc1b2a8/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java#L433
I will try to get to this later this week, do let me know if you are still working on it/have pending comments to address. Thanks
Hi, @mridulm , I've been working on some other issues recently. If @yabola can do all or part of this task in https://github.com/apache/spark/pull/38560, please go ahead.
Hi, @mridulm @yabola I have updated this PR, could you help to this PR again? Thanks.
https://github.com/apache/spark/pull/37922#discussion_r1054110064
For the AppShufflePartitionInfo that is processing the first pushed shuffle block, partition.mapTracker.getCardinality() == 0, and won't put this reduceId to reduceIds, and then the shuffle files will not be cleanup and leak.
I prefer to add this change in this PR.
Could you add some unit test for this? I wrote unit test before. You can have a look or write new UT.
Could you add some unit test for this? I wrote unit test before. You can have a look or write new UT.
Thanks, backport the UT
+CC @otterc, can you take a look at this PR ?
@wankunde, please ping me when you are done with the updates to the PR. Thanks
@mridulm I rebase the code, could you help to review this PR again? Thanks
I just merged #39316 to unblock this PR, can you update to latest please ? Thx
@otterc Thanks for your review. Format the code.
The tests are failing @wankunde, though I dont think it is due to your PR. Can you please take a look ? And retrigger it if it is unrelated ? Thanks !
Merged to master. Thanks for fixing this @wankunde ! Thanks for the reviews @otterc, @akpatnam25, @yabola :-)