openraft
openraft copied to clipboard
RFC: Client request pipelining
Currently, client_write()
sends the request to openraft
via mpsc
channel, which spools (potentially many) requests. The openraft
core works on batches of requests and produces batches of responses. However, on the client_write()
side, each task has to explicitly await the client_write()
.
To improve things, one could think about multiple solutions.
What immediately comes to mind is to use another mpsc
channel, where the consumer of replies can consume them at its own pace (also pipelined). However, this would probably require major rework and I don't know whether we can integrate it with existing API. So I don't think this is the way to go.
What we already have, is a semi-abstraction of reply handling - sending the reply via oneshot
channel. So my suggestion would be the following:
Instead of hard-coding a oneshot
channel to send the reply back, the RaftMsg::ClientWriteRequest
would require a new type ReplyConsumerType: ReplyConsumer
or so on the config for tx
.
The ReplyConsumer
would be something like this:
trait ReplyConsumer<D, R> {
fn from_request(request: D) -> (D, Self);
fn request_completed(self, R);
}
so it can be built from AppData
. The only additional change in openraft
would be to use tx.request_completed()
instead of tx.send()
to send a reply to ClientWriteRequest
.
The default implementation of ReplyConsumer
would internally create and use oneshot
channel and implement a Future
(basically, wrapping the request as is done today). This would require zero change to the user code (beyond specifying the default type for ReplyConsumer
).
If the user uses the default implementation for ReplyConsumerType
, then client_write()
would be simply enabled by where ReplyConsumerType: Future
. There would be no change to the client. Of course, the user is also free to create a different implementation which implements Future
, so client_write()
can be used also with custom implementation.
A second, synchronous API client_write_ff()
(or so, ff
for fire-and-forget) would not require implementing Future
, instead it would rely on the implementation of ReplyConsumer
doing the "right thing" to send the reply for further pipelined processing (e.g., via posting to a user-specific mpsc
channel spooling replies).
With this fairly small change, we could enable fully-pipelined processing also in the caller of client_write()
w/o resorting to a workaround with task-per-request.
Thoughts?
👋 Thanks for opening this issue!
Get help or engage by:
-
/help
: to print help messages. -
/assignme
: to assign this issue to you.
If the user uses the default implementation for
ReplyConsumerType
,
I have a general understanding of your concept.
It seems that ReplyConsumer
is an internal component of Openraft.
Could you explain whether and how a user can decide to utilize the default implementation of ReplyConsumerType
?
Additionally, would you be able to provide the method signatures for the revised client_write()
and client_write_ff()
functions?
No, ReplyConsumer
is a public trait, which can be implemented by the user as needed (for example, we'd push replies into reorder buffer reordering them with read-only requests on the same client connection). But, to ease the implementation, I'm suggesting to provide something like DefaultReplyConsumer
which implements ReplyConsumer
and Future
and internally uses oneshot
channel. The type config would specify which reply consumer to use.
As for the signature, it would be:
pub async fn client_write(
&self,
app_data: C::D,
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
where C::ReplyConsumer: Future<Output = Result<
ClientWriteResponse<C>,
RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
>>;
pub fn client_write_ff(&self, app_data: C::D);
The only suboptimal thing is that you can call client_write_ff
also with the default reply consumer. OTOH, it would do exactly what it says - fire and forget. The response and any potential error would be lost, so probably not a big deal.
-
I see that Openraft calls
ReplyConsumer::request_completed()
to send back the response. But why does it needs aD
to create aReplyConsumer
withReplyConsumer::from_request(D)
? -
From the method signature I do not see how a user get an instance of
ReplyConsumer
, then who will poll theFuture
ofReplyConsumer
?
But why does it needs a
D
to create aReplyConsumer
withReplyConsumer::from_request(D)
?
The idea is to pass additional information "hidden" in D
, which allows us to create the consumer. For example, the queue where to put the reply. Otherwise the reply consumer be stateless, which is not what we want. Alternatively, one could pass reference to SM
or similar, where it could be hidden as well. But, some form of context for the reply consumer is needed.
From the method signature I do not see how a user get an instance of
ReplyConsumer
, then who will poll theFuture
ofReplyConsumer
?
The user won't get the instance. The Future
is polled within client_write()
, same as today with oneshot::Receiver
and client_write()
would still return C::R
. I.e., in the default case it will produce exactly the same binary code as we have today.
In the other case where the user implements ReplyConsumer
, say, to post responses to a queue, the ReplyConsumer::request_completed()
will be called with the response and then the user is free to do with the reply whatever he pleases (in this case, post the response to some queue, which was noted down when creating the ReplyConsumer
based on the context).
Make sense.
The abstraction for queuing replies is an elegant design. 👍
@schreter
Isn't there an issue with the Future
implementation for ReplyConsumer
?
The instance of ReplyConsuemr
will be sent to RaftCore
so that RaftCore
send back the reply via it.
Thus the Future
should not be implemented for ReplyConsumer
but instead, it should be implemented for another instance like a receiver.
The ReplyConsumer
API would be like the following: from_request
builds a ReplyConsumer
and a receiver Future.
pub type AppDataResult<C> = Result<(LogIdOf<C>, ROf<C>), ForwardToLeader<C>>;
#[derive(Debug, Clone, thiserror::Error)]
#[error("The response consumer has been closed.")]
pub struct Closed;
pub trait ReplyConsumer<C>: OptionalSend + 'static
where C: RaftTypeConfig
{
fn from_request(
app_data: C::D,
) -> (
C::D,
Self,
Option<impl Future<Output = Result<AppDataResult<C>, Closed>>>,
)
where Self: Sized;
// ...
}
Make sence?
The
ReplyConsumer
API would be like the following:from_request
builds aReplyConsumer
and a receiverFuture
.
You are right, we need two objects, one that we send to consume the reply (e.g., send it over oneshot
channel) and one to await in/return back to the caller. That also helps with defining the "other" API:
pub trait ReplyConsumer<C>: OptionalSend + 'static
where C: RaftTypeConfig
{
/// The type generated for the send side, which may be a `Future`.
/// (the name is preliminary, it's suboptimal)
type SendResult;
fn from_request(
app_data: C::D,
) -> (
C::D,
Self,
Self::SendResult,
)
where Self: Sized;
// ...
}
impl Raft {
// ...
pub async fn client_write(
&self,
app_data: C::D,
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
where C::ReplyConsumer::SendResult: Future<Output = Result<
ClientWriteResponse<C>,
RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
>>;
pub fn client_write_ff(&self, app_data: C::D) -> C::ReplyConsumer::SendResult;
// ...
}
I.e., the client_write()
version with Future
result would just await the future, but even if you'd use client_write_ff()
with Future
-based SendResult
, it would be well-formed. You could await the future yourself somewhere else (e.g., send multiple requests and then join!
them and the like).
Then, client_write()
would be just a thin wrapper:
pub async fn client_write(
&self,
app_data: C::D,
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
where C::ReplyConsumer::SendResult: Future<Output = Result<
ClientWriteResponse<C>,
RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
>> {
self.client_write_ff(app_data).await
}
Make sense