datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Clean up job data on both Scheduler and Executor

Open mingmwang opened this issue 3 years ago • 1 comments

Which issue does this PR close?

Closes #9 and #185.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

mingmwang avatar Sep 05 '22 08:09 mingmwang

@thinkharderdev @yahoNanJing @Ted-Jiang @andygrove

mingmwang avatar Sep 05 '22 09:09 mingmwang

I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.

@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.

andygrove avatar Sep 25 '22 22:09 andygrove

I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.

@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.

Sure, working on it.

mingmwang avatar Sep 26 '22 02:09 mingmwang

@andygrove @yahoNanJing @Ted-Jiang @yahoNanJing

BTW, in this PR, the job data in the state store will also be deleted after 300s. I think we need a following PR to move the completed(Success or Failed) job data from state store to ObjectStore for long time storing purpose, and Scheduler UI can read from the ObjectStore.

Please share your thoughts.

const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;

async fn clean_up_job_data(
        state: Arc<dyn StateBackendClient>,
        active_job_cache: ExecutionGraphCache,
        failed: bool,
        job_id: String,
        executor_manager: Option<ExecutorManager>,
    ) -> Result<()> {
        let mut active_graph_cache = active_job_cache.write().await;
        active_graph_cache.remove(&job_id);

        let keyspace = if failed {
            Keyspace::FailedJobs
        } else {
            Keyspace::CompletedJobs
        };

        let lock = state.lock(keyspace.clone(), "").await?;
        with_lock(lock, state.delete(keyspace, &job_id)).await?;

        executor_manager
            .map(|em| async { Self::clean_up_executors_data(job_id.clone(), em).await });
        Ok(())
    }

mingmwang avatar Sep 26 '22 08:09 mingmwang

@mingmwang could you fix the conflicts here when you have the time so that we can merge this?

andygrove avatar Oct 10 '22 14:10 andygrove

@mingmwang could you fix the conflicts here when you have the time so that we can merge this?

Sure, I will fix the conflicts tomorrow.

mingmwang avatar Oct 11 '22 10:10 mingmwang

Resolved conflicts.

mingmwang avatar Oct 12 '22 16:10 mingmwang

Thanks again @mingmwang

andygrove avatar Oct 12 '22 17:10 andygrove