openraft
openraft copied to clipboard
Using a channel to asynchronously receive a reply from StateMachine::apply().
The reply should happen asynchronously, somehow. Make this method synchronous and instead of using the result, pass a channel where to post the completion.
The Raft core can then collect completions on this channel and update the client with the result once all the preceding operations have been applied to the state machine. This way we'll reach operation pipelining w/o the need to wait for the completion of each operation inline.
https://github.com/datafuselabs/openraft/blob/46644c8409c7dff627c991cc65849507ad5265b3/openraft/src/storage.rs#L324-L329
👋 Thanks for opening this issue!
Get help or engage by:
-
/help
: to print help messages. -
/assignme
: to assign this issue to you.
Just yesterday, we discussed it with the colleagues what this means in detail (also for the snapshot, that's similar).
In fact, we need a two-stage process: first, the state machine update or snapshot is triggered on the &mut self
to prepare the operation. This may even complete the operation fully, but on the other hand, it may need to (sometimes) synchronize, so it still needs to be async. As a result, we can return a 3-state value with immediate result, immediate error or a boxed dyn Future + 'static
(e.g., representing a task to be executed in the background). Then, for all such futures, we can start background tasks with appropriate report back to the main loop of the Raft core. The started tasks (and also immediately completed, if there was no future reported) can be kept in a List
or Deque
or similar structure. Since they are produced in-order, the structure grows to the right.
When the task completes, as mentioned, it will send the completion message. If it is at the head of the queue, then we can report the completion back to the client and repeatedly check the new head. I.e., if we have tasks 1,2,3,4,5 completed out-of-order 2,3,5,1,4, then the completion of 2, 3 and 5 will do nothing, completion of 1 (head of the outstanding requests) will send the completion of 1 to the client and then check the remainder of the queue, i.e., also send completions for already-completed requests 2 and 3. 4 is incomplete, so the processing stops there. Following the same logic, when 4 completes, it will send completions for 4 and 5 to the client.
As a result, we can return a 3-state value with immediate result, immediate error or a boxed
dyn Future + 'static
(e.g., representing a task to be executed in the background).
So this is what you expected the API to be, right?
// where R: AppResponse
type ApplyResult = Result<R, Error>;
enum Either<T, U>{
Left(T),
Right(U),
}
fn apply_to_state_machien() -> Either<ApplyResult, Box<dyn Future<Output=<ApplyResult>>>
If it is, what about making the return value just a Future?
- An implementation could just return a Future that is already ready for
poll()
if the apply can be done at once. - And it could also return a Future in which the apply will actually be done.
With GAT, will it be as efficient as returning an immediate result?
Yes, something like that. Simple Future
won't work, since the Future
which needs to be deferred to a background task must be 'static
. Further, the implementation may need to await something. Concretely, we have exactly this situation in our code: the modification preparation needs write access (via &mut self
), but also needs to await
allocation of resources (which is typically immediate, but not always). Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the 'static Future
to be awaited in a task.
Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the
'static Future
to be awaited in a task.
Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.
Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.
That's true, but not sufficient.
The problem is as follows:
- The internal implementation needs to run alone on the state machine until all resources are acquired. This is typically immediately, but it is
async
. This is the future returned from theapply_to_state_machine()
call. Due to the technical constraint, this future is not'static
, but rather bound to'_
of the caller. - After acquiring resources, there is a potentially long-running task, which doesn't need write access to the state machine and can run concurrently to other operations on the state machine. This task must be
'static
(otherwise you can't dispatch it and can't "unborrow"&mut self
to return fromapply_to_state_machine()
). Ultimately, this task is finished and at that time, the result of the operation is known and can be sent to the client (provided all previous operations also finished).
Instead of returning a double-future, it's also possible to call the API with a result receiver (which needs to be 'static
). Then, instead of returning the result directly by the Future
, the returned Future
would be the one just preparing the state machine (or not - it could've computed the result directly) and the actual result will be sent later via the result receiver provided to the call.
This way, the API would be fairly simple and the "simple" implementations could just set the result on the receiver before returning, more complex ones would do it in a background task.
The receiver would effectively schedule a callback to the main loop via the mpsc
channel (or whatever other primitive is used, I'm not sure).
I.e., something like:
async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>);
where ResultReceiver
would have something like:
fn send_result(&self, index: u64, result: Result<R, StorageError>);
A simple implementation would implement it then like this:
async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>)
{
for entry in entries {
result_receiver.send_result(entry.log_id.index, internal_state.apply(entry).await);
}
}
I hope it is clearer now.