spark icon indicating copy to clipboard operation
spark copied to clipboard

[WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

Open wankunde opened this issue 3 years ago • 8 comments

What changes were proposed in this pull request?

Cleanup merged shuffle data files after query finished.

Why are the changes needed?

There will be too many merged shuffle data files for long running spark applications.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Local cluster test.

wankunde avatar Sep 18 '22 14:09 wankunde

+CC @akpatnam25, @otterc - since you were planning to work on this already.

mridulm avatar Sep 18 '22 18:09 mridulm

Hi, @mridulm Thanks for your review.

In my opinion, I do not think we should add ShuffleMergeId in RemoveShuffleMerge. The push-based shuffle service will only keep the latest shuffleMergeId data for each shuffle, and RemoveShuffleMerge will cleanup its data. Just like cleaning up normal shuffle data written by mapTasks, these methods only need shuffleId: def removeShuffle(shuffleId: Int)

What do you think?

wankunde avatar Sep 19 '22 05:09 wankunde

We should decouple current implementation details when making protocol changes, and make it extensible for future evolution.

In this case though, it is much more straightforward - there is an existing usecase which requires shuffle merge id. When retrying an indeterminate stage, we should cleanup merged shuffle data for previous stage attempt (in submitMissingTasks, before unregisterAllMapAndMergeOutput) - and given the potential race conditions there, we dont want RemoveShuffleMerge to clean up for the next attempt (when we add support for this).

This specific change can be done in a follow up PR though - I want to get the basic mechanics working in this PR, and ensure the cleanup usecase is handled - before looking at further enhancements.

mridulm avatar Sep 19 '22 05:09 mridulm

We should decouple current implementation details when making protocol changes, and make it extensible for future evolution.

In this case though, it is much more straightforward - there is an existing usecase which requires shuffle merge id. When retrying an indeterminate stage, we should cleanup merged shuffle data for previous stage attempt (in submitMissingTasks, before unregisterAllMapAndMergeOutput) - and given the potential race conditions there, we dont want RemoveShuffleMerge to clean up for the next attempt (when we add support for this).

This specific change can be done in a follow up PR though - I want to get the basic mechanics working in this PR, and ensure the cleanup usecase is handled - before looking at further enhancements.

The push-based shuffle service will auto clean up the old shuffle merge data, and the following stage will read the new stage's output, so we don't need send RemoveShuffleMerge RPC for a new ShuffleMerge? The only scenario I can think of now where a cleanup RPC needs is the spark job completes. Could we think of other scenarios?

wankunde avatar Sep 19 '22 06:09 wankunde

retest this please

wankunde avatar Sep 19 '22 06:09 wankunde

The push-based shuffle service will auto clean up the old shuffle merge data

Consider the case I mentioned above - stage retry for an INDETERMINATE stage. We cleanup previous attempts only if it happens to use the same merger in new attempt as well.

The previous attempt's mergers are not reused for the next attempt - and so the previous mergers will continue to hold stale data without cleaning them up - until application terminates. Note - any merger which happens to be reused in new stage attempt will cleanup - I am referring to those which are not used: old attempt mergerLocs -- new attempts mergerLocs.

mridulm avatar Sep 19 '22 17:09 mridulm

Can one of the admins verify this patch?

AmplabJenkins avatar Sep 19 '22 21:09 AmplabJenkins

Thanks @mridulm , I will the the shuffleMergeId into the protocol later. This PR also has some other issues, I will fix it soon.

wankunde avatar Sep 20 '22 05:09 wankunde

Gentle ping @wankunde. Do you think you can update the PR soon? Please let us know if you need any help.

otterc avatar Oct 06 '22 18:10 otterc

Gentle ping @wankunde. Do you think you can update the PR soon? Please let us know if you need any help.

Sorry for the late reply, I will update this PR today.

wankunde avatar Oct 09 '22 04:10 wankunde

Hi, @otterc @mridulm If the spark application stops unexpectedly, there will be some leaked merge files. In our production cluster, yarn nodeManager will cleanup these leaked files if the disk is not full, but not if the disk is full. Could we change the blockHandler.applicationRemoved(appId, false /* clean up local dirs */) to blockHandler.applicationRemoved(appId, true) in YarnShuffleService to clean up them ?

https://github.com/apache/spark/blob/e6699570becadb91695572bca5adc1605dc1b2a8/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java#L433

wankunde avatar Oct 09 '22 06:10 wankunde

I will try to get to this later this week, do let me know if you are still working on it/have pending comments to address. Thanks

mridulm avatar Nov 17 '22 08:11 mridulm

Hi, @mridulm , I've been working on some other issues recently. If @yabola can do all or part of this task in https://github.com/apache/spark/pull/38560, please go ahead.

wankunde avatar Nov 21 '22 03:11 wankunde

Hi, @mridulm @yabola I have updated this PR, could you help to this PR again? Thanks.

wankunde avatar Dec 18 '22 07:12 wankunde

https://github.com/apache/spark/pull/37922#discussion_r1054110064

For the AppShufflePartitionInfo that is processing the first pushed shuffle block, partition.mapTracker.getCardinality() == 0, and won't put this reduceId to reduceIds, and then the shuffle files will not be cleanup and leak. I prefer to add this change in this PR.

wankunde avatar Dec 22 '22 12:12 wankunde

Could you add some unit test for this? I wrote unit test before. You can have a look or write new UT.

yabola avatar Dec 23 '22 01:12 yabola

Could you add some unit test for this? I wrote unit test before. You can have a look or write new UT.

Thanks, backport the UT

wankunde avatar Dec 23 '22 03:12 wankunde

+CC @otterc, can you take a look at this PR ?

mridulm avatar Dec 27 '22 21:12 mridulm

@wankunde, please ping me when you are done with the updates to the PR. Thanks

mridulm avatar Dec 30 '22 21:12 mridulm

@mridulm I rebase the code, could you help to review this PR again? Thanks

wankunde avatar Jan 01 '23 13:01 wankunde

I just merged #39316 to unblock this PR, can you update to latest please ? Thx

mridulm avatar Jan 01 '23 19:01 mridulm

@otterc Thanks for your review. Format the code.

wankunde avatar Jan 13 '23 03:01 wankunde

The tests are failing @wankunde, though I dont think it is due to your PR. Can you please take a look ? And retrigger it if it is unrelated ? Thanks !

mridulm avatar Jan 14 '23 03:01 mridulm

Merged to master. Thanks for fixing this @wankunde ! Thanks for the reviews @otterc, @akpatnam25, @yabola :-)

mridulm avatar Jan 14 '23 10:01 mridulm