datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Task level retry and Stage level retry

Open mingmwang opened this issue 3 years ago • 2 comments

Which issue does this PR close?

Closes #140.

Rationale for this change

What changes are included in this PR?

1. Task level retry, a) Add task attempt num, use a job level unique task_id to represent the Task b) Define couple of failure reasons for FailedTask

pub enum FailedReason {
        #[prost(message, tag="4")]
        ExecutionError(super::ExecutionError),
        #[prost(message, tag="5")]
        FetchPartitionError(super::FetchPartitionError),
        #[prost(message, tag="6")]
        IoError(super::IoError),
        #[prost(message, tag="7")]
        ExecutorLost(super::ExecutorLost),
        /// A successful task's result is lost due to executor lost
        #[prost(message, tag="8")]
        ResultLost(super::ResultLost),
        #[prost(message, tag="9")]
        TaskKilled(super::TaskKilled),
    }

c) Reasoning the real task failure reason when return the TaskStatus back to Scheduler d) Based on the failure reason, scheduler decide to reschedule the task and bump task attempt number of failed task.

2. Stage level retry and shuffle read failure handling a) Add stage attempt num. b) When there are shuffle partition fetch failures, the current running stage will be rolled back and the map stages will be resubmit. c) handling of delayed fetch failure task updates d) Cancel the running tasks if Scheduler decide to fail the stage/job.

And below two items are not covered in this PR and we may need to revisit the related logic in future:

  1. If the plans/expressions in the Stage is not deterministic, need to revisit the resubmit logic.
  2. If we have a map stage whose shuffle output can be reused by multiple reduce stages, need to revisit the stage retry logic.

Are there any user-facing changes?

mingmwang avatar Sep 21 '22 16:09 mingmwang

@thinkharderdev @andygrove @yahoNanJing Please help to review my PR.

mingmwang avatar Sep 25 '22 14:09 mingmwang

Added 10+ UTs to cover different cases.

mingmwang avatar Sep 25 '22 14:09 mingmwang

With this PR, the state machine for the stage becomes as follows: StageStateMachine

The transitions marked in red are newly added ones to deal with error task recovery.

yahoNanJing avatar Sep 28 '22 07:09 yahoNanJing

For example, one SQL stage graph snapshot is as follows:

                                      Stage 4(Resolved)
                         ↗                                                 ↘
 Stage 1(Successful)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Successful)  ->  Stage 3(Running) 

Both of Stage1 and Stage2 has output shuffle data resides on Executor1. Unluckily, the Executor1 gets lost. Then tasks for the running Stage3 will fail due to not able to fetch shuffle data. For this kind of scenario and error, by this PR, the ExecutionGraph will be able to continue running with proper task reset.

  1. Firstly, Stage3 will be converted from Running to Unresolved. And Stage3 will ask its dependent Stage2 to rerun related tasks to prepare its input data.
  2. Then Stage2 will be converted from Successful to Running. Unluckily, the related rerunning tasks for Stage2 will also fail due to not able to fetch shuffle data. Then Stage2 will be converted from Running to Unresolved and will ask its dependent Stage1 to rerun related tasks to prepare its input data.
  3. Then Stage1 will be converted from Successful to Running.
  4. Then Stage4 who dependent on Stage1 will be converted from Resolved to Unresolved.

The stage graph snapshot will become as follows:

                                      Stage 4(Unresolved)
                         ↗                                                 ↘
 Stage 1(Running)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Unresolved)  ->  Stage 3(Unresolved) 

Once Stage1 finishes successfully, the graph snapshot will become as follows:

                                      Stage 4(Resolved)
                         ↗                                                 ↘
 Stage 1(Successful)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Resolved)  ->  Stage 3(Unresolved) 

...

The purpose of introducing task attempt and stage attempt is as follows:

  • Stage attempt: For a running stage, all of the task status stored should belong to the same stage attempt. The update of task statuses with oldest attempt number will be ignored.
  • Task attempt: For some errors, like shuffle write IO error, they are retryable. It's feasible to reschedule this task to another executor to have a new attempt running. The attempt number will be reset to 0 if the stage starts a new attempt.

yahoNanJing avatar Sep 28 '22 08:09 yahoNanJing

@mingmwang I'm pretty swamped this week so won't have time to review this until this weekend.

thinkharderdev avatar Sep 28 '22 13:09 thinkharderdev

Later, maybe we can make this error task recovery feature configurable.

yahoNanJing avatar Sep 28 '22 14:09 yahoNanJing

The overall design of this PR is good for me and it will be really useful to deal with executor lost issue and executor bad disk issue. @thinkharderdev, @andygrove, @avantgardnerio, could you help review this PR?

yahoNanJing avatar Sep 28 '22 14:09 yahoNanJing

Thanks, @mingmwang. I plan on starting to review this tomorrow.

andygrove avatar Sep 29 '22 04:09 andygrove

@mingmwang I have taken a first pass through this PR and I think it looks good. I am going to spend some time testing out the PR locally. It would be good to wait for Dan to review as well before we merge this.

andygrove avatar Sep 29 '22 21:09 andygrove

I tested this locally, and it worked really well!

I ran one scheduler, and two executors and I could kill one executor and still see a query complete successfully. This is not the case in the master branch.

I am happy to approve this once feedback has been addressed.

andygrove avatar Sep 29 '22 22:09 andygrove

I tested this locally, and it worked really well!

I ran one scheduler, and two executors and I could kill one executor and still see a query complete successfully. This is not the case in the master branch.

I am happy to approve this once feedback has been addressed.

Thank you. We will also start the chaos-monkey testing in next month to verify the recent changes.

mingmwang avatar Sep 30 '22 02:09 mingmwang

Thanks @mingmwang for this huge step of error recovering. Thanks @andygrove and @thinkharderdev for reviewing this PR. Since we all approved this PR, I'll merge it first.

yahoNanJing avatar Oct 02 '22 09:10 yahoNanJing