Clean up job data on both Scheduler and Executor
Which issue does this PR close?
Closes #9 and #185.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
@thinkharderdev @yahoNanJing @Ted-Jiang @andygrove
I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.
@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.
I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.
@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.
Sure, working on it.
@andygrove @yahoNanJing @Ted-Jiang @yahoNanJing
BTW, in this PR, the job data in the state store will also be deleted after 300s. I think we need a following PR to move the completed(Success or Failed) job data from state store to ObjectStore for long time storing purpose, and Scheduler UI can read from the ObjectStore.
Please share your thoughts.
const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;
async fn clean_up_job_data(
state: Arc<dyn StateBackendClient>,
active_job_cache: ExecutionGraphCache,
failed: bool,
job_id: String,
executor_manager: Option<ExecutorManager>,
) -> Result<()> {
let mut active_graph_cache = active_job_cache.write().await;
active_graph_cache.remove(&job_id);
let keyspace = if failed {
Keyspace::FailedJobs
} else {
Keyspace::CompletedJobs
};
let lock = state.lock(keyspace.clone(), "").await?;
with_lock(lock, state.delete(keyspace, &job_id)).await?;
executor_manager
.map(|em| async { Self::clean_up_executors_data(job_id.clone(), em).await });
Ok(())
}
@mingmwang could you fix the conflicts here when you have the time so that we can merge this?
@mingmwang could you fix the conflicts here when you have the time so that we can merge this?
Sure, I will fix the conflicts tomorrow.
Resolved conflicts.
Thanks again @mingmwang