celeborn
celeborn copied to clipboard
[WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files
What changes were proposed in this pull request?
Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions
Why are the changes needed?
Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue
Does this PR introduce any user-facing change?
No
How was this patch tested?
Cluster test and uts
Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?
HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!
Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?
HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!
Sorry for late reply, the pr will be updated today or tomorrow
Codecov Report
Attention: Patch coverage is 1.20482%
with 82 lines
in your changes are missing coverage. Please review.
Project coverage is 48.51%. Comparing base (
12c3779
) to head (ef81070
). Report is 12 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #2373 +/- ##
==========================================
- Coverage 48.77% 48.51% -0.26%
==========================================
Files 209 210 +1
Lines 13109 13186 +77
Branches 1134 1139 +5
==========================================
+ Hits 6393 6396 +3
- Misses 6294 6368 +74
Partials 422 422
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Thanks @wangshengjie123 nice pr! Another suggestion is better to add UT for this feature.
UTs is doing, test in cluster this week, uts will be submit later
@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.
From my understanding, in this PR we're diverting from vanilla spark approach based on mapIndex and just dividing the full partition into multiple sub-partition based on some heuristics. I'm new to Celeborn code, so might be missing something basic but in this PR we're not addressing below issue. If we consider a basic scenario where a partial partition read is happening and we see a FetchFailure.
ShuffleMapStage --> ResultStage
- ShuffleMapStage (attempt 0) generated [P0, P1, P2] and P0 is skewed with partition location [0,1,2,3,4,5].
- AQE asks for three splits and this PR logic will create three partitions [0, 1], [2, 3], [4, 5]
- Now consider is reducer read [0, 1] and [2, 3] and gets
FetchFailure
while reading [4, 5] - This will trigger a complete mapper stage retry a/c to this doc and will clear the map output corresponding the shuffleID
- ShuffleMapStage (attempt 0) will again generate data for P0 at different partition location [a, b, c, d, e, f] and it will get divided like [a, b], [c, d], [e, f]
- Now if reader stage is
ShuffleMapStage
then it will read every sub-partition again but if the reader isResultStage
then it will only read missing partition data which [e, f].
The data generated on location 1
and location a
would be different because of other factors like network delay (same thing applies for other locations). Ex – The data that might be present in 1st location in first attempt might be present in 2nd location or any location in different attempt because of the order mapper generated the data and in order server received that data.
This can cause both Data loss and Data duplication, this might be getting addressed in some other place in the codebase that i'm not aware of but i wanted point this problem out.
@s0nskar Good point, this should be an issue for ResultStage, even though the ShuffleMapStage's output is deterministic.
IIRC, vanilla Spark also has some limitations on stage retry cases for ResultStage when ShuffleMapStage's output is indeterministic, for such cases, we need to fail the job, right?
@pan3793 This does not become problem if we are maintaining the concept of mapIndex ranges as spark will always read deterministic output for each sub-partition.
As vanilla spark always read deterministic output because of mapIndex range filter, it will not face this issue. In this approach sub-partitions data will be indeterministic across stage attempts. Failing would be only option for such cases until spark start supporting ResultStage rollback.
Also, I think this issue would not be only limited to ResultStage, this can happen with ShuffleMapStage as well in some complex cases. Consider another scenario –
ShuffleMapStage1 -----> ShuffleMapStage2 ----->
- Similar to above example, let's say partition skew P0 generated by
ShuffleMapStage1
. - ShuffleMapStage2 gets FetchFailure while reading sub-partitions of ShuffleMapStage1.
- ShuffleMapStage1 will be recomputed and shuffle outputs will be cleared.
- Only missing task of ShuffleMapStage2 will be retries, again causing the same issue.
This is case though, we can rollback the whole lineage till this point instead of failing this job. Similar to what vanilla spark does, what this will be very expensive.
@s0nskar I see your point. When consuming skew partitions, we should always treat the previous ShuffleMapStage
's output as indeterministic under the current approach to avoid correctness issues.
Hi @s0nskar , thanks for your point, I think you are correct. Seems this PR conflicts with stage rerun.
we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.
@pan3793 Is it possible to force make it as indeterministic
?
Also, I think Spark doesn't correctly set stage's determinism for some cases, for example a row_number window operator followed by aggregation keyed by the row_number.
cc @mridulm @ErikFang
@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.
The sort based approach is roughly like this:
- Each sub reducer reads from all partition splits of its partitionId for data within its map range
- The first read request will trigger the partition split file to be sorted based on map ids, so each IO will be sequential
Thanks a lot @waitinfuture for the sort based approach description.
Is it possible to force make it as indeterministic?
IMO this would be very difficult to do it from Celeborn itself but it can be done by putting a patch in the Spark code. ShuffledRowRDD can set Determinacy Level to INDETEMINATE if partial partition reads are happening and Celeborn is getting is used.
cc: @mridulm for viz
@waitinfuture It seems this PR is getting attention, some discussions happened offline, we'd better update the PR description(or Google Docs) to summarize the whole design and known issues so far
It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)
It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)
the split into subranges is deterministic
The way Celeborn splits partition is not deterministic with stage rerun, for example any push failure will cause split, so I'm afraid this statement does not hold...
Ah, I see what you mean ... PartitionLocation
would change between retries.
Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207
I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.
+CC @otterc as well.
Ah, I see what you mean ...
PartitionLocation
would change between retries. Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.
Maybe we can remain both this optimization and stage rerun, but only allows one to take effect by checking configs for now. The performance issue this PR solves does happen in production.
Codecov Report
Attention: Patch coverage is 1.20482%
with 82 lines
in your changes missing coverage. Please review.
Project coverage is 48.51%. Comparing base (
121395f
) to head (ef81070
). Report is 101 commits behind head on main.
:exclamation: Current head ef81070 differs from pull request most recent head 81947e7
Please upload reports for the commit 81947e7 to get more accurate results.
Additional details and impacted files
@@ Coverage Diff @@
## main #2373 +/- ##
==========================================
+ Coverage 40.17% 48.51% +8.34%
==========================================
Files 218 210 -8
Lines 13742 13186 -556
Branches 1214 1139 -75
==========================================
+ Hits 5520 6396 +876
+ Misses 7905 6368 -1537
- Partials 317 422 +105
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
To reviewer. Just wanted to give you an update on my recent validation work on our internal cluster using split skewed partition with Celeborn Split level approach. I ran a job with the default Celeborn Split size of 1GB and spark advisoryPartitionSize of 64MB. However, I noticed that only 1/16 tasks were fetching the shuffle data to run, while the rest were empty.
After discussing this with @waitinfuture , @wangshengjie123 , and @pan3793 , we decided to leverage chunks to split skewed partitions and gain more fine-grained data size sub-partitions. This was implemented in https://github.com/apache/celeborn/pull/2373/commits/dfeb731da692aeef1c513f5ac3837275146009f5 and I tested it on my internal cluster with online tasks. The performance of the Shuffle Read stage was almost as good as ESS.
Based on my current read, this does have correctness implications. I would suggest we should do either or all of the following:
a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
Thoughts ?
Also, how does this feature interact with
celeborn.client.shuffle.rangeReadFilter.enabled
?
Current if this pr is enabled, shuffle client won`t really apply rangeReadFilter, but we can avoid enable rangeReadFilter. Maybe we could close rangeReadFilter and set shuffle stage INDETEMINATE at shuffle level
a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
Totally agree! As @waitinfuture mentioned above, and after discussing with @pan3793 offline, we can disable this feature and the use of stage rerun
at the same time. I'll fix this later.
b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?
seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?
When fetch failures are due to worker unavailability (node crash, etc) - replicas should allow the reducer to continuing fetching data. This is more of a statistical reduction in failures, not elimination - agree.
a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss. b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
@mridulm @cfmcgrady As enabling replication is expensive and some uses might not want to enable it. Another way to handle could be to only fail the stage if stage has any skew partition read. This way it will only affect the stages where skew and will not affect the stages or apps where there is no skew, this can increase the overall reliability for huge percentage of apps. We can make it configurable to give more control to the user. WDYT?
QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on ShufflePartitionsUtil
) ?
This will help if we are trying to mitigate the impact of reducer read timeouts, etc. It wont bring down the overall load (at worker) though.
On plus side, it does not suffer from the correctness issues here.
Thoughts ?
QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on
ShufflePartitionsUtil
) ?This will help if we are trying to mitigate the impact of reducer read timeouts, etc. It wont bring down the overall load (at worker) though.
On plus side, it does not suffer from the correctness issues here.
Thoughts ?
Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : ( BTW, even though we figure out a way, sorting some files increases the memory/disk burden on worker (maybe negligible if worker is under low load)
Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : (
In general, this is going to be very close to 100% - typically it will be less when reducer stage is also a result stage + it is computing only a subset of partitions (like take
, etc).
Note that things like shuffle reuse should not be an issue - since in those cases, the sort would have already happened anyway.
BTW, even though we figure out a way, sorting some files increases the memory/disk burden on worker (maybe negligible if worker is under low load)
Completely agree ! The overall cost wont change - but the impact can be spread out over a larger duration, and executed at the workers at a lower priority: so that when request comes in, it is already materialized.
I have not thought through what the overall impact of this idea is - so it is likely I am missing corner cases; but wanted to surface the option to solicit thoughts :-)
In general, this is going to be very close to 100%
You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.
IMHO, if we can figure out which partitions will be read in map-range fashion ahead of time (which in my understanding is quite difficult, currently depend on Spark's reoptimization before each stage), we can apply your proposal to pre-sort them in current implementation. Additionally, we can still keep working on this PR so users can choose which one to use, because this PR eliminates sorting completely which is quite attractive to me :)
You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.
No, I meant if skew is detected at driver, the chances that the partition will be fetched by a task in reducer is going to be close to 100% ... and if we trigger this from driver, by the time the executor fetches shuffle data, it would give the worker a longer period of time to finish preparing the data (sort, etc).
I say close to 100% because there can always be cases where df1.join(df2, "col").take(5)
- and so not all partitions will be computed :-)
Note that because tasks which are computing skewed partitions are a (very) small fraction overall, the early trigger to compute this (overall) wont be too expensive - while helping us avoid the additional delays when a specific task is fetching data.
Additionally, we can still keep working on this PR so users can choose which one to use, because this PR eliminates sorting completely which is quite attractive to me :)
Avoiding sort if always attractive :-) The data correctness issues, unless we can mitigate it, is what makes it risky for me.