Xline icon indicating copy to clipboard operation
Xline copied to clipboard

[Feature]: Client should retry immediately after receive certain errors

Open bsbds opened this issue 3 months ago • 5 comments

Description about the feature

The current retry logic in the curp client always waits for a period of time after the propose has returned an error. However, this may not be necessary for certain errors like CurpError::WrongClusterVersion or CurpError::Redirect. These errors indicate that the client's local information is outdated, and after fetching the latest infomation from the cluster, the retry could proceed immediately.

https://github.com/xline-kv/Xline/blob/296105fe01aafbd473d47fde088f32094892c5f3/crates/curp/src/client/retry.rs#L117-L183

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

bsbds avatar Mar 05 '24 08:03 bsbds

Hello there @bsbds ! I'd like to work on this issue if that's fine by you. Thank you!

Harsh1s avatar Mar 05 '24 20:03 Harsh1s

Hello there @bsbds ! I'd like to work on this issue if that's fine by you. Thank you!

Sure, assigned.

bsbds avatar Mar 06 '24 07:03 bsbds

Hey @bsbds ! I have made the necessary changes to the code. Wanted to ask if there are any more error-codes other than CurpError::WrongClusterVersion and CurpError::Redirect that need immediate retrying? Thanks for the help!

Harsh1s avatar Mar 08 '24 04:03 Harsh1s

Hey @bsbds ! I have made the necessary changes to the code. Wanted to ask if there are any more error-codes other than CurpError::WrongClusterVersion and CurpError::Redirect that need immediate retrying? Thanks for the help!

From what I see now, there are only these two kinds of errors that do not require retrying.

I think the (retriable) errors can be divided into two types, one type indicating that the server is currently unable to process this request, while the other type indicating that the server's current state is okay, and it's the client's own state that is outdated. What do you think?

bsbds avatar Mar 08 '24 12:03 bsbds

Hey @bsbds ! I'm sorry for the delay in this PR, I had my university exams. I have made some changes to the required function but I'm getting process didn't exit successfully: (signal: 6, SIGABRT: process abort signal) error when I run cargo test. Here are the changes I made:

/// Takes a function f and run retry.
    async fn retry<'a, R, F>(&'a self, f: impl Fn(&'a Api) -> F) -> Result<R, tonic::Status>
    where
        F: Future<Output = Result<R, CurpError>>,
    {
        let mut backoff = self.config.init_backoff();
        let mut last_err = None;
        let mut consecutive_client_error_count = 0;
        const MAX_CONSECUTIVE_CLIENT_ERROR_COUNT: usize = 5;

        loop {
            let err = match f(&self.inner).await {
                Ok(res) => return Ok(res),
                Err(err) => err,
            };

            match err {
                // Errors that should not retry
                CurpError::Duplicated(_)
                | CurpError::ShuttingDown(_)
                | CurpError::InvalidConfig(_)
                | CurpError::NodeNotExists(_)
                | CurpError::NodeAlreadyExists(_)
                | CurpError::LearnerNotCatchUp(_) => {
                    return Err(tonic::Status::from(err));
                }

                // Server-side errors that should retry after a delay
                CurpError::ExpiredClientId(_)
                | CurpError::KeyConflict(_)
                | CurpError::Internal(_)
                | CurpError::LeaderTransfer(_)
                | CurpError::RpcTransport(_) => {
                    consecutive_client_error_count = 0;
                    let delay = match backoff.next_delay() {
                        Some(delay) => delay,
                        None => break,
                    };
                    if let CurpError::RpcTransport(_) = &err {
                        // update leader state if we got an RPC transport error
                        if let Err(e) = self.inner.fetch_leader_id(true).await {
                            warn!("fetch leader failed, error {e:?}");
                        }
                    }

                    #[cfg(feature = "client-metrics")]
                    super::metrics::get().client_retry_count.add(1, &[]);

                    warn!(
                        "got error: {err:?}, retry on {} seconds later",
                        delay.as_secs_f32()
                    );
                    last_err = Some(err);
                    tokio::time::sleep(delay).await;
                }

                // Client-side errors that should retry immediately
                CurpError::WrongClusterVersion(_) | CurpError::Redirect(_) => {
                    if let CurpError::WrongClusterVersion(_) = &err {
                        // update the cluster state if got WrongClusterVersion
                        if let Err(e) = self.inner.fetch_cluster(true).await {
                            warn!("fetch cluster failed, error {e:?}");
                        }
                    } else if let CurpError::Redirect(Redirect { leader_id, term }) = &err {
                        // update the leader state if got Redirect
                        let _ig = self.inner.update_leader(*leader_id, *term).await;
                    }

                    warn!("got error: {err:?}, retrying immediately", err = err);

                    consecutive_client_error_count += 1;
                    if consecutive_client_error_count >= MAX_CONSECUTIVE_CLIENT_ERROR_COUNT {
                        warn!(
                            "Maximum consecutive client error count reached, not retrying anymore"
                        );
                        last_err = Some(err);
                        break;
                    }
                }
            }
        }

        Err(tonic::Status::deadline_exceeded(format!(
            "request timeout, last error: {:?}",
            last_err.unwrap_or_else(|| unreachable!("last error must be set"))
        )))
    }

I have made the changes such that the function is basically divided into two parts, one handles client side errors and other handles server side errors. With client side errors, we retry immediately without calling any delay or backoff. But with this comes the danger of infinite loop, hence I introduced a threshold for this as you can see. Can you help me figure where I'm going wrong?

I'm sorry for the delay and so many doubts, I'm new to async rust.

Harsh1s avatar Mar 11 '24 16:03 Harsh1s