openraft icon indicating copy to clipboard operation
openraft copied to clipboard

Using a channel to asynchronously receive a reply from StateMachine::apply().

Open drmingdrmer opened this issue 3 years ago • 6 comments

The reply should happen asynchronously, somehow. Make this method synchronous and instead of using the result, pass a channel where to post the completion.

The Raft core can then collect completions on this channel and update the client with the result once all the preceding operations have been applied to the state machine. This way we'll reach operation pipelining w/o the need to wait for the completion of each operation inline.

https://github.com/datafuselabs/openraft/blob/46644c8409c7dff627c991cc65849507ad5265b3/openraft/src/storage.rs#L324-L329

drmingdrmer avatar Feb 25 '22 07:02 drmingdrmer

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

github-actions[bot] avatar Feb 25 '22 07:02 github-actions[bot]

Just yesterday, we discussed it with the colleagues what this means in detail (also for the snapshot, that's similar).

In fact, we need a two-stage process: first, the state machine update or snapshot is triggered on the &mut self to prepare the operation. This may even complete the operation fully, but on the other hand, it may need to (sometimes) synchronize, so it still needs to be async. As a result, we can return a 3-state value with immediate result, immediate error or a boxed dyn Future + 'static (e.g., representing a task to be executed in the background). Then, for all such futures, we can start background tasks with appropriate report back to the main loop of the Raft core. The started tasks (and also immediately completed, if there was no future reported) can be kept in a List or Deque or similar structure. Since they are produced in-order, the structure grows to the right.

When the task completes, as mentioned, it will send the completion message. If it is at the head of the queue, then we can report the completion back to the client and repeatedly check the new head. I.e., if we have tasks 1,2,3,4,5 completed out-of-order 2,3,5,1,4, then the completion of 2, 3 and 5 will do nothing, completion of 1 (head of the outstanding requests) will send the completion of 1 to the client and then check the remainder of the queue, i.e., also send completions for already-completed requests 2 and 3. 4 is incomplete, so the processing stops there. Following the same logic, when 4 completes, it will send completions for 4 and 5 to the client.

schreter avatar Feb 25 '22 08:02 schreter

As a result, we can return a 3-state value with immediate result, immediate error or a boxed dyn Future + 'static (e.g., representing a task to be executed in the background).

So this is what you expected the API to be, right?

// where R: AppResponse
type ApplyResult = Result<R, Error>;

enum Either<T, U>{
    Left(T),
    Right(U),
}

fn apply_to_state_machien() -> Either<ApplyResult, Box<dyn Future<Output=<ApplyResult>>>

If it is, what about making the return value just a Future?

  • An implementation could just return a Future that is already ready for poll() if the apply can be done at once.
  • And it could also return a Future in which the apply will actually be done.

With GAT, will it be as efficient as returning an immediate result?

drmingdrmer avatar Feb 25 '22 12:02 drmingdrmer

Yes, something like that. Simple Future won't work, since the Future which needs to be deferred to a background task must be 'static. Further, the implementation may need to await something. Concretely, we have exactly this situation in our code: the modification preparation needs write access (via &mut self), but also needs to await allocation of resources (which is typically immediate, but not always). Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the 'static Future to be awaited in a task.

schreter avatar Feb 25 '22 15:02 schreter

Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the 'static Future to be awaited in a task.

Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.

drmingdrmer avatar Feb 25 '22 15:02 drmingdrmer

Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.

That's true, but not sufficient.

The problem is as follows:

  • The internal implementation needs to run alone on the state machine until all resources are acquired. This is typically immediately, but it is async. This is the future returned from the apply_to_state_machine() call. Due to the technical constraint, this future is not 'static, but rather bound to '_ of the caller.
  • After acquiring resources, there is a potentially long-running task, which doesn't need write access to the state machine and can run concurrently to other operations on the state machine. This task must be 'static (otherwise you can't dispatch it and can't "unborrow" &mut self to return from apply_to_state_machine()). Ultimately, this task is finished and at that time, the result of the operation is known and can be sent to the client (provided all previous operations also finished).

Instead of returning a double-future, it's also possible to call the API with a result receiver (which needs to be 'static). Then, instead of returning the result directly by the Future, the returned Future would be the one just preparing the state machine (or not - it could've computed the result directly) and the actual result will be sent later via the result receiver provided to the call.

This way, the API would be fairly simple and the "simple" implementations could just set the result on the receiver before returning, more complex ones would do it in a background task.

The receiver would effectively schedule a callback to the main loop via the mpsc channel (or whatever other primitive is used, I'm not sure).

I.e., something like:

async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>);

where ResultReceiver would have something like:

fn send_result(&self, index: u64, result: Result<R, StorageError>);

A simple implementation would implement it then like this:

async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>)
{
    for entry in entries {
        result_receiver.send_result(entry.log_id.index, internal_state.apply(entry).await);
    }
}

I hope it is clearer now.

schreter avatar Feb 25 '22 16:02 schreter