[#1579][part-1] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry
What changes were proposed in this pull request?
clear out previous stage attempt data synchronously when registering the re-assignment shuffleIds.
Why are the changes needed?
Fix: #1579
If the previous stage attempt is in the purge queue in shuffle-server side, the retry stage writing will cause unknown exceptions, so we'd better to clear out all previous stage attempt data before re-registering
This PR is to sync remove previous stage data when the first attempt writer is initialized.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.
cc @dingshun3016 @yl09099 PTAL
Test Results
2 433 files ±0 2 433 suites ±0 5h 0m 35s :stopwatch: - 1m 8s 934 tests ±0 933 :white_check_mark: ±0 1 :zzz: ±0 0 :x: ±0 10 828 runs ±0 10 814 :white_check_mark: ±0 14 :zzz: ±0 0 :x: ±0
Results for commit 61e2a9bc. ± Comparison against base commit fd64d9dc.
:recycle: This comment has been updated with latest results.
After rethinking this, I think the reassignAllShuffleServersForWholeStage could be invoked by the retry writer rather than previous failed writer that could ensure no older data into server after re-register.
Codecov Report
Attention: Patch coverage is 3.29670% with 352 lines in your changes are missing coverage. Please review.
Project coverage is 53.42%. Comparing base (
6f6d35a) to head (5c9d9e3). Report is 34 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #1584 +/- ##
============================================
- Coverage 54.86% 53.42% -1.45%
- Complexity 2358 2943 +585
============================================
Files 368 435 +67
Lines 16379 23768 +7389
Branches 1504 2208 +704
============================================
+ Hits 8986 12697 +3711
- Misses 6862 10290 +3428
- Partials 531 781 +250
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data. We should rely on the data skip to avoid reading the failure data.
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.
Could you describe more?
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.
Could you describe more?
There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.
@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.
Could you describe more?
There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.
If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg attempt id
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.
Could you describe more?
There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.
If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg
attemptid
OK, Maybe rejection the legacy data will be better choice.
@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.
Ignore this. Maybe rejection legacy data will be a better choice.
Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you
Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you
Several questions:
- How to reject the legacy requests?
- How to delete the legacy shuffle?
- How to reject the legacy requests?
Using the latest attemtp id in server side to check whether the send request is valid with the older version, this will be finished in the next PR.
- How to delete the legacy shuffle?
This has been involved in this PR.
Can we register a shuffle as the tuple (shuffle_id, stage_attempt_id)? This way, we do not need to wait for (shuffle_id, 0) to be be deleted synchronously, and can go on registering and writing (shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).
I think deletion of earlier shuffle data should not be synchronously in the first place! That is flawed by design. Think of TB of shuffle data. They should be deleted quickly / constant time (e.g. HDFS move) and cleaned up asynchronously (e.g. HDMF delete).
Can we register a shuffle as the tuple
(shuffle_id, stage_attempt_id)? This way, we do not need to wait for(shuffle_id, 0)to be be deleted synchronously, and can go on registering and writing(shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).
Agree with you. I’m concerned about the cost of refactor.
Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.
Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.
Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.
Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.
Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.
Given Spark shuffleId and Spark stageAttemptNo, the client can register unique shuffle id shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo.
Disclaimer: this reduces the space of supported shuffle ids to Integer.MAX_VALUE/spark.stage.maxConsecutiveAttempts.
Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.
Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.
Given Spark
shuffleIdand SparkstageAttemptNo, the client can register unique shuffle idshuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo.Disclaimer: this reduces the space of supported shuffle ids to
Integer.MAX_VALUE/spark.stage.maxConsecutiveAttempts.
Spark has some cases which we need to notice:
Spark may compute partial tasks in a new attempt. The new attempt data and the old attempt data are the complete. Limit operation and stage recomputation will trigger similar cases.
Spark client can easily come up with a per-stage-attempt shuffle id and feed that to the shuffle server. That should not require any server-side refactoring.
Thanks for your review. Sounds good, but I'm not sure the whether shuffleId can be unique. If you have some relavant understand, please let me know if you want.
Given Spark
shuffleIdand SparkstageAttemptNo, the client can register unique shuffle idshuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo. Disclaimer: this reduces the space of supported shuffle ids toInteger.MAX_VALUE/spark.stage.maxConsecutiveAttempts.Spark has some cases which we need to notice: Spark may compute partial tasks in a new attempt. The new attempt data and the old attempt data are the complete.
Limitoperation and stage recomputation will trigger similar cases.
From the @EnricoMi opinion, If we make the unique shuffleIdWithAttemptNo generated or converted in server side that means the client side will not change anything, the spark will ensure the previous semanic.
Spark may compute partial tasks in a new attempt.
You are saying a stage can be computed partially, let's say the first task and (if the desired number of rows do not materialize) the more tasks in a separate stage? Do these two stages have the same stage id (with different attempt numbers) or do they have different stage ids? How can I reproduce this? Where can I read up on this?
If we make the unique shuffleIdWithAttemptNo generated or converted in server side
I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.
Spark may compute partial tasks in a new attempt.
You are saying a stage can be computed partially, let's say the first task and (if the desired number of rows do not materialize) the more tasks in a separate stage? Do these two stages have the same stage id (with different attempt numbers) or do they have different stage ids? How can I reproduce this? Where can I read up on this?
You can see SparkPlan#executeTake .
If we make the unique shuffleIdWithAttemptNo generated or converted in server side
I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.
Maybe we can add a stageAttempt field in the request.
If we make the unique shuffleIdWithAttemptNo generated or converted in server side
I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.
Maybe we can add a stageAttempt field in the request.
If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side. shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNo looks fine to me.
If we make the unique shuffleIdWithAttemptNo generated or converted in server side
I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.
Maybe we can add a stageAttempt field in the request.
If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side.
shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNolooks fine to me.
Unless some a partial stage relies on shuffle data of an earlier (partial) stage attempt (same Spark shuffle id) as outlined by @jerqi.
I am looking into this.
If we make the unique shuffleIdWithAttemptNo generated or converted in server side
I presume the server side does not know about the stage attempt number, so that would have to be plumbed through from the client side anyway.
Maybe we can add a stageAttempt field in the request.
If we could generate unique new shuffleId in client side, there is no need to populate stageAttemptNo to shuffle server side.
shuffleId * conf.getInt("spark.stage.maxConsecutiveAttempts") + stageAttemptNolooks fine to me.Unless some a partial stage relies on shuffle data of an earlier (partial) stage attempt (same Spark shuffle id) as outlined by @jerqi.
I am looking into this.
- Second case is that Spark will have ExchangeReuse. Will it be influenced?
- Third case is that Spark origin stage retry can retry partial tasks if downstream failed to read.
@yl09099 will take over this and update PR here.