incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[#1579][part-1] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry

Open zuston opened this issue 1 year ago • 31 comments

What changes were proposed in this pull request?

clear out previous stage attempt data synchronously when registering the re-assignment shuffleIds.

Why are the changes needed?

Fix: #1579

If the previous stage attempt is in the purge queue in shuffle-server side, the retry stage writing will cause unknown exceptions, so we'd better to clear out all previous stage attempt data before re-registering

This PR is to sync remove previous stage data when the first attempt writer is initialized.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

zuston avatar Mar 15 '24 03:03 zuston

cc @dingshun3016 @yl09099 PTAL

zuston avatar Mar 15 '24 03:03 zuston

Test Results

 2 433 files  ±0   2 433 suites  ±0   5h 0m 35s :stopwatch: - 1m 8s    934 tests ±0     933 :white_check_mark: ±0   1 :zzz: ±0  0 :x: ±0  10 828 runs  ±0  10 814 :white_check_mark: ±0  14 :zzz: ±0  0 :x: ±0 

Results for commit 61e2a9bc. ± Comparison against base commit fd64d9dc.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Mar 15 '24 03:03 github-actions[bot]

After rethinking this, I think the reassignAllShuffleServersForWholeStage could be invoked by the retry writer rather than previous failed writer that could ensure no older data into server after re-register.

zuston avatar Mar 15 '24 03:03 zuston

Codecov Report

Attention: Patch coverage is 3.29670% with 352 lines in your changes are missing coverage. Please review.

Project coverage is 53.42%. Comparing base (6f6d35a) to head (5c9d9e3). Report is 34 commits behind head on master.

Files Patch % Lines
...uniffle/shuffle/manager/RssShuffleManagerBase.java 0.00% 187 Missing :warning:
.../shuffle/handle/StageAttemptShuffleHandleInfo.java 0.00% 43 Missing :warning:
...pache/uniffle/server/ShuffleServerGrpcService.java 0.00% 32 Missing :warning:
.../apache/spark/shuffle/RssStageResubmitManager.java 0.00% 22 Missing :warning:
...spark/shuffle/handle/MutableShuffleHandleInfo.java 0.00% 22 Missing :warning:
...niffle/server/netty/ShuffleServerNettyHandler.java 0.00% 9 Missing :warning:
...ffle/client/request/RssRegisterShuffleRequest.java 0.00% 7 Missing :warning:
...fle/shuffle/manager/ShuffleManagerGrpcService.java 0.00% 6 Missing :warning:
...ffle/client/impl/grpc/ShuffleServerGrpcClient.java 0.00% 6 Missing :warning:
...ffle/client/request/RssSendShuffleDataRequest.java 0.00% 5 Missing :warning:
... and 6 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1584      +/-   ##
============================================
- Coverage     54.86%   53.42%   -1.45%     
- Complexity     2358     2943     +585     
============================================
  Files           368      435      +67     
  Lines         16379    23768    +7389     
  Branches       1504     2208     +704     
============================================
+ Hits           8986    12697    +3711     
- Misses         6862    10290    +3428     
- Partials        531      781     +250     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Mar 22 '24 08:03 codecov-commenter

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data. We should rely on the data skip to avoid reading the failure data.

jerqi avatar Mar 22 '24 10:03 jerqi

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

zuston avatar Mar 22 '24 10:03 zuston

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

jerqi avatar Mar 23 '24 12:03 jerqi

@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.

jerqi avatar Mar 25 '24 02:03 jerqi

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg attempt id

zuston avatar Mar 25 '24 03:03 zuston

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg attempt id

OK, Maybe rejection the legacy data will be better choice.

jerqi avatar Mar 25 '24 07:03 jerqi

@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.

Ignore this. Maybe rejection legacy data will be a better choice.

jerqi avatar Mar 25 '24 07:03 jerqi

Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you

zuston avatar Mar 26 '24 03:03 zuston

Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you

Several questions:

  1. How to reject the legacy requests?
  2. How to delete the legacy shuffle?

jerqi avatar Mar 26 '24 05:03 jerqi

  1. How to reject the legacy requests?

Using the latest attemtp id in server side to check whether the send request is valid with the older version, this will be finished in the next PR.

  1. How to delete the legacy shuffle?

This has been involved in this PR.

zuston avatar Mar 26 '24 06:03 zuston

Can we register a shuffle as the tuple (shuffle_id, stage_attempt_id)? This way, we do not need to wait for (shuffle_id, 0) to be be deleted synchronously, and can go on registering and writing (shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).

EnricoMi avatar Mar 26 '24 10:03 EnricoMi

I think deletion of earlier shuffle data should not be synchronously in the first place! That is flawed by design. Think of TB of shuffle data. They should be deleted quickly / constant time (e.g. HDFS move) and cleaned up asynchronously (e.g. HDMF delete).

EnricoMi avatar Mar 26 '24 10:03 EnricoMi

Can we register a shuffle as the tuple (shuffle_id, stage_attempt_id)? This way, we do not need to wait for (shuffle_id, 0) to be be deleted synchronously, and can go on registering and writing (shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).

Agree with you. I’m concerned about the cost of refactor.

zuston avatar Mar 26 '24 11:03 zuston

Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.

EnricoMi avatar Mar 26 '24 12:03 EnricoMi

Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.

Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.

zuston avatar Mar 27 '24 08:03 zuston

Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.

Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.

Given Spark shuffleId and Spark stageAttemptNo, the client can register unique shuffle id shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo.

Disclaimer: this reduces the space of supported shuffle ids to Integer.MAX_VALUE/spark.stage.maxConsecutiveAttempts.

EnricoMi avatar Mar 27 '24 13:03 EnricoMi

Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.

Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.

Given Spark shuffleId and Spark stageAttemptNo, the client can register unique shuffle id shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo.

Disclaimer: this reduces the space of supported shuffle ids to Integer.MAX_VALUE/spark.stage.maxConsecutiveAttempts.

Spark has some cases which we need to notice: Spark may compute partial tasks in a new attempt. The new attempt data and the old attempt data are the complete. Limit operation and stage recomputation will trigger similar cases.

jerqi avatar Mar 28 '24 03:03 jerqi

Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.

Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.

Given Spark shuffleId and Spark stageAttemptNo, the client can register unique shuffle id shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo. Disclaimer: this reduces the space of supported shuffle ids to Integer.MAX_VALUE/spark.stage.maxConsecutiveAttempts.

Spark has some cases which we need to notice: Spark may compute partial tasks in a new attempt. The new attempt data and the old attempt data are the complete. Limit operation and stage recomputation will trigger similar cases.

From the @EnricoMi opinion, If we make the unique shuffleIdWithAttemptNo generated or converted in server side that means the client side will not change anything, the spark will ensure the previous semanic.

zuston avatar Mar 28 '24 03:03 zuston

Spark may compute partial tasks in a new attempt.

You are saying a stage can be computed partially, let's say the first task and (if the desired number of rows do not materialize) the more tasks in a separate stage? Do these two stages have the same stage id (with different attempt numbers) or do they have different stage ids? How can I reproduce this? Where can I read up on this?

EnricoMi avatar Mar 28 '24 07:03 EnricoMi

If we make the unique shuffleIdWithAttemptNo generated or converted in server side

I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.

EnricoMi avatar Mar 28 '24 07:03 EnricoMi

Spark may compute partial tasks in a new attempt.

You are saying a stage can be computed partially, let's say the first task and (if the desired number of rows do not materialize) the more tasks in a separate stage? Do these two stages have the same stage id (with different attempt numbers) or do they have different stage ids? How can I reproduce this? Where can I read up on this?

You can see SparkPlan#executeTake .

jerqi avatar Mar 28 '24 07:03 jerqi

If we make the unique shuffleIdWithAttemptNo generated or converted in server side

I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.

Maybe we can add a stageAttempt field in the request.

jerqi avatar Mar 28 '24 07:03 jerqi

If we make the unique shuffleIdWithAttemptNo generated or converted in server side

I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.

Maybe we can add a stageAttempt field in the request.

If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side. shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo looks fine to me.

zuston avatar Mar 28 '24 08:03 zuston

If we make the unique shuffleIdWithAttemptNo generated or converted in server side

I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.

Maybe we can add a stageAttempt field in the request.

If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side. shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo looks fine to me.

Unless some a partial stage relies on shuffle data of an earlier (partial) stage attempt (same Spark shuffle id) as outlined by @jerqi.

I am looking into this.

EnricoMi avatar Mar 28 '24 08:03 EnricoMi

If we make the unique shuffleIdWithAttemptNo generated or converted in server side

I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.

Maybe we can add a stageAttempt field in the request.

If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side. shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo looks fine to me.

Unless some a partial stage relies on shuffle data of an earlier (partial) stage attempt (same Spark shuffle id) as outlined by @jerqi.

I am looking into this.

  1. Second case is that Spark will have ExchangeReuse. Will it be influenced?
  2. Third case is that Spark origin stage retry can retry partial tasks if downstream failed to read.

jerqi avatar Mar 28 '24 09:03 jerqi

@yl09099 will take over this and update PR here.

zuston avatar May 11 '24 03:05 zuston