celeborn icon indicating copy to clipboard operation
celeborn copied to clipboard

[WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files

Open wangshengjie123 opened this issue 11 months ago • 41 comments

What changes were proposed in this pull request?

Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions

Why are the changes needed?

Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue

Does this PR introduce any user-facing change?

No

How was this patch tested?

Cluster test and uts

wangshengjie123 avatar Mar 11 '24 02:03 wangshengjie123

Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?

HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!

cfmcgrady avatar Mar 14 '24 07:03 cfmcgrady

Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?

HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!

Sorry for late reply, the pr will be updated today or tomorrow

wangshengjie123 avatar Mar 16 '24 07:03 wangshengjie123

Codecov Report

Attention: Patch coverage is 1.20482% with 82 lines in your changes are missing coverage. Please review.

Project coverage is 48.51%. Comparing base (12c3779) to head (ef81070). Report is 12 commits behind head on main.

Files Patch % Lines
...born/common/protocol/message/ControlMessages.scala 0.00% 38 Missing :warning:
.../apache/celeborn/common/write/PushFailedBatch.java 0.00% 24 Missing :warning:
...org/apache/celeborn/common/util/PbSerDeUtils.scala 0.00% 9 Missing :warning:
...g/apache/celeborn/common/protocol/StorageInfo.java 0.00% 6 Missing :warning:
...va/org/apache/celeborn/common/write/PushState.java 16.67% 5 Missing :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2373      +/-   ##
==========================================
- Coverage   48.77%   48.51%   -0.26%     
==========================================
  Files         209      210       +1     
  Lines       13109    13186      +77     
  Branches     1134     1139       +5     
==========================================
+ Hits         6393     6396       +3     
- Misses       6294     6368      +74     
  Partials      422      422              

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Mar 16 '24 08:03 codecov[bot]

Thanks @wangshengjie123 nice pr! Another suggestion is better to add UT for this feature.

UTs is doing, test in cluster this week, uts will be submit later

wangshengjie123 avatar Mar 24 '24 09:03 wangshengjie123

@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.

s0nskar avatar Apr 04 '24 08:04 s0nskar

From my understanding, in this PR we're diverting from vanilla spark approach based on mapIndex and just dividing the full partition into multiple sub-partition based on some heuristics. I'm new to Celeborn code, so might be missing something basic but in this PR we're not addressing below issue. If we consider a basic scenario where a partial partition read is happening and we see a FetchFailure.

ShuffleMapStage --> ResultStage

  • ShuffleMapStage (attempt 0) generated [P0, P1, P2] and P0 is skewed with partition location [0,1,2,3,4,5].
  • AQE asks for three splits and this PR logic will create three partitions [0, 1], [2, 3], [4, 5]
  • Now consider is reducer read [0, 1] and [2, 3] and gets FetchFailure while reading [4, 5]
  • This will trigger a complete mapper stage retry a/c to this doc and will clear the map output corresponding the shuffleID
  • ShuffleMapStage (attempt 0) will again generate data for P0 at different partition location [a, b, c, d, e, f] and it will get divided like [a, b], [c, d], [e, f]
  • Now if reader stage is ShuffleMapStage then it will read every sub-partition again but if the reader is ResultStage then it will only read missing partition data which [e, f].

The data generated on location 1 and location a would be different because of other factors like network delay (same thing applies for other locations). Ex – The data that might be present in 1st location in first attempt might be present in 2nd location or any location in different attempt because of the order mapper generated the data and in order server received that data.

This can cause both Data loss and Data duplication, this might be getting addressed in some other place in the codebase that i'm not aware of but i wanted point this problem out.

s0nskar avatar Apr 04 '24 13:04 s0nskar

@s0nskar Good point, this should be an issue for ResultStage, even though the ShuffleMapStage's output is deterministic.

IIRC, vanilla Spark also has some limitations on stage retry cases for ResultStage when ShuffleMapStage's output is indeterministic, for such cases, we need to fail the job, right?

pan3793 avatar Apr 04 '24 14:04 pan3793

@pan3793 This does not become problem if we are maintaining the concept of mapIndex ranges as spark will always read deterministic output for each sub-partition.

As vanilla spark always read deterministic output because of mapIndex range filter, it will not face this issue. In this approach sub-partitions data will be indeterministic across stage attempts. Failing would be only option for such cases until spark start supporting ResultStage rollback.

s0nskar avatar Apr 04 '24 14:04 s0nskar

Also, I think this issue would not be only limited to ResultStage, this can happen with ShuffleMapStage as well in some complex cases. Consider another scenario –

ShuffleMapStage1 -----> ShuffleMapStage2 ----->

  • Similar to above example, let's say partition skew P0 generated by ShuffleMapStage1.
  • ShuffleMapStage2 gets FetchFailure while reading sub-partitions of ShuffleMapStage1.
  • ShuffleMapStage1 will be recomputed and shuffle outputs will be cleared.
  • Only missing task of ShuffleMapStage2 will be retries, again causing the same issue.

This is case though, we can rollback the whole lineage till this point instead of failing this job. Similar to what vanilla spark does, what this will be very expensive.

s0nskar avatar Apr 04 '24 14:04 s0nskar

@s0nskar I see your point. When consuming skew partitions, we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.

pan3793 avatar Apr 04 '24 16:04 pan3793

Hi @s0nskar , thanks for your point, I think you are correct. Seems this PR conflicts with stage rerun.

we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.

@pan3793 Is it possible to force make it as indeterministic?

Also, I think Spark doesn't correctly set stage's determinism for some cases, for example a row_number window operator followed by aggregation keyed by the row_number.

cc @mridulm @ErikFang

waitinfuture avatar Apr 04 '24 16:04 waitinfuture

@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.

The sort based approach is roughly like this:

  1. Each sub reducer reads from all partition splits of its partitionId for data within its map range
  2. The first read request will trigger the partition split file to be sorted based on map ids, so each IO will be sequential

image image

waitinfuture avatar Apr 04 '24 16:04 waitinfuture

Thanks a lot @waitinfuture for the sort based approach description.

Is it possible to force make it as indeterministic?

IMO this would be very difficult to do it from Celeborn itself but it can be done by putting a patch in the Spark code. ShuffledRowRDD can set Determinacy Level to INDETEMINATE if partial partition reads are happening and Celeborn is getting is used.

cc: @mridulm for viz

s0nskar avatar Apr 04 '24 17:04 s0nskar

@waitinfuture It seems this PR is getting attention, some discussions happened offline, we'd better update the PR description(or Google Docs) to summarize the whole design and known issues so far

pan3793 avatar Apr 04 '24 18:04 pan3793

It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)

mridulm avatar Apr 06 '24 01:04 mridulm

It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)

the split into subranges is deterministic

The way Celeborn splits partition is not deterministic with stage rerun, for example any push failure will cause split, so I'm afraid this statement does not hold...

waitinfuture avatar Apr 06 '24 03:04 waitinfuture

Ah, I see what you mean ... PartitionLocation would change between retries. Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207

I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.

mridulm avatar Apr 06 '24 05:04 mridulm

+CC @otterc as well.

mridulm avatar Apr 06 '24 05:04 mridulm

Ah, I see what you mean ... PartitionLocation would change between retries. Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207

I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.

Maybe we can remain both this optimization and stage rerun, but only allows one to take effect by checking configs for now. The performance issue this PR solves does happen in production.

waitinfuture avatar Apr 06 '24 11:04 waitinfuture

Codecov Report

Attention: Patch coverage is 1.20482% with 82 lines in your changes missing coverage. Please review.

Project coverage is 48.51%. Comparing base (121395f) to head (ef81070). Report is 101 commits behind head on main.

:exclamation: Current head ef81070 differs from pull request most recent head 81947e7

Please upload reports for the commit 81947e7 to get more accurate results.

Files Patch % Lines
...born/common/protocol/message/ControlMessages.scala 0.00% 38 Missing :warning:
.../apache/celeborn/common/write/PushFailedBatch.java 0.00% 24 Missing :warning:
...org/apache/celeborn/common/util/PbSerDeUtils.scala 0.00% 9 Missing :warning:
...g/apache/celeborn/common/protocol/StorageInfo.java 0.00% 6 Missing :warning:
...va/org/apache/celeborn/common/write/PushState.java 16.67% 5 Missing :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2373      +/-   ##
==========================================
+ Coverage   40.17%   48.51%   +8.34%     
==========================================
  Files         218      210       -8     
  Lines       13742    13186     -556     
  Branches     1214     1139      -75     
==========================================
+ Hits         5520     6396     +876     
+ Misses       7905     6368    -1537     
- Partials      317      422     +105     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Apr 07 '24 03:04 codecov-commenter

To reviewer. Just wanted to give you an update on my recent validation work on our internal cluster using split skewed partition with Celeborn Split level approach. I ran a job with the default Celeborn Split size of 1GB and spark advisoryPartitionSize of 64MB. However, I noticed that only 1/16 tasks were fetching the shuffle data to run, while the rest were empty.

After discussing this with @waitinfuture , @wangshengjie123 , and @pan3793 , we decided to leverage chunks to split skewed partitions and gain more fine-grained data size sub-partitions. This was implemented in https://github.com/apache/celeborn/pull/2373/commits/dfeb731da692aeef1c513f5ac3837275146009f5 and I tested it on my internal cluster with online tasks. The performance of the Shuffle Read stage was almost as good as ESS.

cfmcgrady avatar Apr 07 '24 15:04 cfmcgrady

Based on my current read, this does have correctness implications. I would suggest we should do either or all of the following:

a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.

b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.

Thoughts ?

Also, how does this feature interact with celeborn.client.shuffle.rangeReadFilter.enabled ?

Current if this pr is enabled, shuffle client won`t really apply rangeReadFilter, but we can avoid enable rangeReadFilter. Maybe we could close rangeReadFilter and set shuffle stage INDETEMINATE at shuffle level

wangshengjie123 avatar Apr 12 '24 09:04 wangshengjie123

a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.

Totally agree! As @waitinfuture mentioned above, and after discussing with @pan3793 offline, we can disable this feature and the use of stage rerun at the same time. I'll fix this later.

b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.

seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?

cfmcgrady avatar Apr 12 '24 10:04 cfmcgrady

seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?

When fetch failures are due to worker unavailability (node crash, etc) - replicas should allow the reducer to continuing fetching data. This is more of a statistical reduction in failures, not elimination - agree.

mridulm avatar Apr 12 '24 20:04 mridulm

a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss. b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.

@mridulm @cfmcgrady As enabling replication is expensive and some uses might not want to enable it. Another way to handle could be to only fail the stage if stage has any skew partition read. This way it will only affect the stages where skew and will not affect the stages or apps where there is no skew, this can increase the overall reliability for huge percentage of apps. We can make it configurable to give more control to the user. WDYT?

s0nskar avatar Apr 16 '24 07:04 s0nskar

QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on ShufflePartitionsUtil) ?

This will help if we are trying to mitigate the impact of reducer read timeouts, etc. It wont bring down the overall load (at worker) though.

On plus side, it does not suffer from the correctness issues here.

Thoughts ?

mridulm avatar Apr 25 '24 03:04 mridulm

QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on ShufflePartitionsUtil) ?

This will help if we are trying to mitigate the impact of reducer read timeouts, etc. It wont bring down the overall load (at worker) though.

On plus side, it does not suffer from the correctness issues here.

Thoughts ?

Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : ( BTW, even though we figure out a way, sorting some files increases the memory/disk burden on worker (maybe negligible if worker is under low load)

waitinfuture avatar Apr 25 '24 03:04 waitinfuture

Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : (

In general, this is going to be very close to 100% - typically it will be less when reducer stage is also a result stage + it is computing only a subset of partitions (like take, etc). Note that things like shuffle reuse should not be an issue - since in those cases, the sort would have already happened anyway.

BTW, even though we figure out a way, sorting some files increases the memory/disk burden on worker (maybe negligible if worker is under low load)

Completely agree ! The overall cost wont change - but the impact can be spread out over a larger duration, and executed at the workers at a lower priority: so that when request comes in, it is already materialized.

I have not thought through what the overall impact of this idea is - so it is likely I am missing corner cases; but wanted to surface the option to solicit thoughts :-)

mridulm avatar Apr 25 '24 04:04 mridulm

In general, this is going to be very close to 100%

You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.

IMHO, if we can figure out which partitions will be read in map-range fashion ahead of time (which in my understanding is quite difficult, currently depend on Spark's reoptimization before each stage), we can apply your proposal to pre-sort them in current implementation. Additionally, we can still keep working on this PR so users can choose which one to use, because this PR eliminates sorting completely which is quite attractive to me :)

waitinfuture avatar Apr 25 '24 06:04 waitinfuture

You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.

No, I meant if skew is detected at driver, the chances that the partition will be fetched by a task in reducer is going to be close to 100% ... and if we trigger this from driver, by the time the executor fetches shuffle data, it would give the worker a longer period of time to finish preparing the data (sort, etc).

I say close to 100% because there can always be cases where df1.join(df2, "col").take(5) - and so not all partitions will be computed :-)

Note that because tasks which are computing skewed partitions are a (very) small fraction overall, the early trigger to compute this (overall) wont be too expensive - while helping us avoid the additional delays when a specific task is fetching data.

Additionally, we can still keep working on this PR so users can choose which one to use, because this PR eliminates sorting completely which is quite attractive to me :)

Avoiding sort if always attractive :-) The data correctness issues, unless we can mitigate it, is what makes it risky for me.

mridulm avatar Apr 25 '24 15:04 mridulm