incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[FEATURE] Add the integration with existing Spark tests on rust based server

Open zuston opened this issue 1 year ago • 14 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the feature

We should create the mini rust server to integrate with existing spark tests.

Motivation

No response

Describe the solution

No response

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

zuston avatar Sep 27 '23 09:09 zuston

@zuston i am new here, let me have a try

guixiaowen avatar Nov 14 '23 01:11 guixiaowen

I think this is a big change. Are you interested on this? @wenlongbrother

zuston avatar Nov 24 '23 07:11 zuston

Hi @zuston I am interested this task and I think it can help me have much deeper understanding of the whole project. Before start this task, here is my understanding of the task the content of this task: our goal is to create a mini rust server to successfully integrate with tests which under the integration-test dictionary. By the way, if we create rust server, do we need to keep the original Java server? Please correct me If I am wrong, Thanks.

wenlongbrother avatar Nov 24 '23 13:11 wenlongbrother

I hope the mini rust based shuffle-server could be tested with the existing integration tests. But this part could be involved in the independent module rather than former intergated-test module.

zuston avatar Nov 27 '23 02:11 zuston

@zuston thanks. Can you assign it to me?

wenlongbrother avatar Nov 27 '23 10:11 wenlongbrother

Do you have time to do this? @guixiaowen

zuston avatar Nov 28 '23 02:11 zuston

Hi @zuston do we have a default config.toml which contains basic config about shuffle server, I will use Java to start rust program. If not I will create a default one and place it in the resource directory.

wenlongbrother avatar Nov 28 '23 13:11 wenlongbrother

Hi @zuston do we have a default config.toml which contains basic config about shuffle server, I will use Java to start rust program. If not I will create a default one and place it in the resource directory.

Haven't. I recommend drafting a initial implementation and then discuss it based on this.

zuston avatar Nov 28 '23 13:11 zuston

Hi @zuston do we have a default config.toml which contains basic config about shuffle server, I will use Java to start rust program. If not I will create a default one and place it in the resource directory.

Haven't. I recommend drafting a initial implementation and then discuss it based on this.

ok

wenlongbrother avatar Nov 29 '23 01:11 wenlongbrother

Still pending, will finish it recent days, just take examination in past serval days.

wenlongbrother avatar Jan 12 '24 02:01 wenlongbrother

Hi @zuston , I am writing sendDataAndRequireBufferTest, here is a quick question why we set partition_id = 1, like below code

async fn require_buffer(
        &self,
        request: Request<RequireBufferRequest>,
    ) -> Result<Response<RequireBufferResponse>, Status> {
        let timer = GRPC_BUFFER_REQUIRE_PROCESS_TIME.start_timer();
        let req = request.into_inner();
        let app_id = req.app_id;
        let shuffle_id = req.shuffle_id;

        let app = self.app_manager_ref.get_app(&app_id);
        if app.is_none() {
            return Ok(Response::new(RequireBufferResponse {
                require_buffer_id: 0,
                status: StatusCode::NO_REGISTER.into(),
                ret_msg: "No such app in this shuffle server".to_string(),
            }));
        }

        let partition_id = PartitionedUId {
            app_id,
            shuffle_id,
            // why?
            // ignore this.
            partition_id: 1,
        };
        let app = app
            .unwrap()
            .require_buffer(RequireBufferContext {
                uid: partition_id.clone(),
                size: req.require_size as i64,
            })
            .instrument_await(format!("require buffer. uid: {:?}", &partition_id))
            .await;

        let res = match app {
            Ok(required_buffer_res) => (
                StatusCode::SUCCESS,
                required_buffer_res.ticket_id,
                "".to_string(),
            ),
            Err(err) => (StatusCode::NO_BUFFER, -1i64, format!("{:?}", err)),
        };

        timer.observe_duration();

        Ok(Response::new(RequireBufferResponse {
            require_buffer_id: res.1,
            status: res.0.into(),
            ret_msg: res.2,
        }))
    }

wenlongbrother avatar Jan 14 '24 14:01 wenlongbrother

The partitionId in RequireBufferContext won't be used in the process of requiring buffer, which is only a placeholder. Please ignore this @wenlongbrother

zuston avatar Jan 15 '24 03:01 zuston

Hi @zuston A follow question, I see below code will log partition_id, do you think it will confuses user? image image

wenlongbrother avatar Jan 22 '24 00:01 wenlongbrother

Hi @zuston , can you please help review this draft pr? https://github.com/apache/incubator-uniffle/pull/1481 I am still in progress, and I wonder does this meet expectation.

wenlongbrother avatar Jan 24 '24 13:01 wenlongbrother