heph
heph copied to clipboard
Virtual Actors
Orleans, an Actor network for .NET, has the concept of "Virtual Actors". A virtual actor handles only a single message and doesn't have any internal state (beyond what is required to handle the message) between invocations. With these limitations its possible to start a single actor per message, easily scaling the actor.
Some initial design ideas:
The VirtualActor
trait that only handles a single message.
trait VirtualActor {
type Message;
type Future = Future<Output = Result<(), Self::Error>>;
type Error;
fn handle_message(&self, ctx: actor::Context<Self::Message>, msg: Self::Message) -> Self::Future;
}
Alternatively we can not pass the initial message and have it in the actor's inbox.
trait VirtualActor {
type Message;
type Future = Future<Output = Result<(), Self::Error>>;
type Error;
/// `ctx` is guaranteed to contain at least one message.
fn start(&self, ctx: actor::Context<Self::Message>) -> Self::Future;
}
Open questions:
- How to do supervision. It makes little sense to restart a
VirtualActor
as it only handles a single message, which is now gone.
Resources:
- https://docs.microsoft.com/en-us/azure/service-fabric/service-fabric-reliable-actors-introduction,
- https://www.microsoft.com/en-us/research/project/orleans-virtual-actors/?from=https%3A%2F%2Fresearch.microsoft.com%2Fen-us%2Fprojects%2Forleans%2F,
- Also Orbit: https://github.com/orbit/orbit.
Some more design ideas.
An updated version of the trait:
pub trait VirtualActor {
type Message;
type Actor: Actor;
type Error;
type RuntimeAccess;
/// Create a new actor that handles `msg`.
fn new(rt: Self::RuntimeAccess, msg: Self::Message) -> Result<Self::Actor, Self::Error>;
}
It's similar to the NewActor
trait, but it doesn't give the actor an actor::Context
. Instead it only gets access to the runtime through RuntimeAccess
(the RT
part of the actor::Context
).
Spawning would look something like the following.
impl RuntimeRef {
/// Spawn a new virtual actor.
///
/// For each message send to the returned actor reference a new virtual
/// actor is created to handle the message.
fn spawn_virtual_actor<S, VA>(&mut self, supervisor: S, options: ActorOptions) -> ActorRef<VA::Message>
where
S: Supervisor<VA>,
VA: VirtualActor,
{
// ...
}
}
diff --git a/src/actor/mod.rs b/src/actor/mod.rs
index 464e722..73931e0 100644
--- a/src/actor/mod.rs
+++ b/src/actor/mod.rs
@@ -144,9 +144,12 @@
mod sync;
#[cfg(test)]
mod tests;
+mod r#virtual;
#[doc(inline)]
pub use context::{Context, NoMessages, ReceiveMessage, RecvError};
+#[doc(inline)]
+pub use r#virtual::VirtualActor;
#[cfg(any(test, feature = "test"))]
pub(crate) use sync::SyncWaker;
#[doc(inline)]
diff --git a/src/actor/virtual.rs b/src/actor/virtual.rs
new file mode 100644
index 0000000..6b01bbe
--- /dev/null
+++ b/src/actor/virtual.rs
@@ -0,0 +1,186 @@
+use crate::actor::{name, Actor};
+
+pub trait VirtualActor {
+ /// The type of messages the actor can receive.
+ ///
+ /// See [`NewActor::Message`] for more information and examples.
+ ///
+ /// [`NewActor::Message`]: crate::actor::NewActor::Message
+ type Message;
+
+ /// The type of the actor.
+ ///
+ /// See [`Actor`](Actor) for more.
+ type Actor: Actor;
+
+ /// The type of error.
+ ///
+ /// Note that if creating an actor is always successful the never type (`!`)
+ /// can be used. Asynchronous functions for example use the never type as
+ /// error.
+ type Error;
+
+ /// The kind of runtime access needed by the actor.
+ type RuntimeAccess;
+
+ /// Create a new actor that handles a single `msg`.
+ fn new(
+ &mut self,
+ rt: Self::RuntimeAccess,
+ msg: Self::Message,
+ ) -> Result<Self::Actor, Self::Error>;
+
+ /* TODO: maybe add arguments?
+ /// Wrap the `NewActor` to change the arguments its accepts.
+ ///
+ /// This can be used when additional arguments are needed to be passed to an
+ /// actor, where another function requires a certain argument list. For
+ /// example when using [`TcpServer`].
+ ///
+ /// [`TcpServer`]: crate::net::TcpServer
+ ///
+ /// # Examples
+ ///
+ /// Using [`TcpServer`] requires a `NewActor` that accepts `(TcpStream,
+ /// SocketAddr)` as arguments, but we need to pass the actor additional
+ /// arguments.
+ ///
+ /// ```
+ /// #![feature(never_type)]
+ ///
+ /// use std::io;
+ /// use std::net::SocketAddr;
+ ///
+ /// use heph::actor::{self, NewActor};
+ /// # use heph::actor::messages::Terminate;
+ /// # use heph::net::tcp::server;
+ /// use heph::net::{TcpServer, TcpStream};
+ /// use heph::rt::{self, Runtime, RuntimeRef, ThreadLocal};
+ /// use heph::spawn::ActorOptions;
+ /// # use heph::supervisor::{Supervisor, SupervisorStrategy};
+ /// # use log::error;
+ ///
+ /// fn main() -> Result<(), rt::Error> {
+ /// // Create and run runtime
+ /// let mut runtime = Runtime::new()?;
+ /// runtime.run_on_workers(setup)?;
+ /// runtime.start()
+ /// }
+ ///
+ /// /// In this setup function we'll spawn the `TcpServer` actor.
+ /// fn setup(mut runtime_ref: RuntimeRef) -> io::Result<()> {
+ /// // Prepare for humans' expand to Mars.
+ /// let greet_mars = true;
+ ///
+ /// // Our actor that accepts three arguments.
+ /// let new_actor = (conn_actor as fn(_, _, _, _) -> _)
+ /// .map_arg(move |(stream, address)| (stream, address, greet_mars));
+ ///
+ /// // For more information about the remainder of this example see
+ /// // `TcpServer`.
+ /// let address = "127.0.0.1:7890".parse().unwrap();
+ /// let server = TcpServer::setup(address, conn_supervisor, new_actor, ActorOptions::default())?;
+ /// # let actor_ref =
+ /// runtime_ref.try_spawn_local(ServerSupervisor, server, (), ActorOptions::default())?;
+ /// # actor_ref.try_send(Terminate).unwrap();
+ /// Ok(())
+ /// }
+ ///
+ /// # #[derive(Copy, Clone, Debug)]
+ /// # struct ServerSupervisor;
+ /// #
+ /// # impl<S, NA> Supervisor<server::Setup<S, NA>> for ServerSupervisor
+ /// # where
+ /// # S: Supervisor<NA> + Clone + 'static,
+ /// # NA: NewActor<Argument = (TcpStream, SocketAddr), Error = !, RuntimeAccess = ThreadLocal> + Clone + 'static,
+ /// # {
+ /// # fn decide(&mut self, err: server::Error<!>) -> SupervisorStrategy<()> {
+ /// # use server::Error::*;
+ /// # match err {
+ /// # Accept(err) => {
+ /// # error!("error accepting new connection: {}", err);
+ /// # SupervisorStrategy::Restart(())
+ /// # }
+ /// # NewActor(_) => unreachable!(),
+ /// # }
+ /// # }
+ /// #
+ /// # fn decide_on_restart_error(&mut self, err: io::Error) -> SupervisorStrategy<()> {
+ /// # error!("error restarting the TCP server: {}", err);
+ /// # SupervisorStrategy::Stop
+ /// # }
+ /// #
+ /// # fn second_restart_error(&mut self, _: io::Error) {
+ /// # // We don't restart a second time, so this will never be called.
+ /// # unreachable!();
+ /// # }
+ /// # }
+ /// #
+ /// # fn conn_supervisor(err: io::Error) -> SupervisorStrategy<(TcpStream, SocketAddr)> {
+ /// # error!("error handling connection: {}", err);
+ /// # SupervisorStrategy::Stop
+ /// # }
+ /// #
+ /// // Actor that handles a connection.
+ /// async fn conn_actor(
+ /// _: actor::Context<!, ThreadLocal>,
+ /// mut stream: TcpStream,
+ /// address: SocketAddr,
+ /// greet_mars: bool
+ /// ) -> io::Result<()> {
+ /// # drop(address); // Silence dead code warnings.
+ /// if greet_mars {
+ /// // In case this example ever reaches Mars.
+ /// stream.send_all(b"Hello Mars").await
+ /// } else {
+ /// stream.send_all(b"Hello World").await
+ /// }
+ /// }
+ /// ```
+ fn map_arg<F, Arg>(self, f: F) -> ArgMap<Self, F, Arg>
+ where
+ Self: Sized,
+ F: FnMut(Arg) -> Self::Argument,
+ {
+ ArgMap {
+ new_actor: self,
+ map: f,
+ _phantom: PhantomData,
+ }
+ }
+ */
+
+ /// Returns the name of the virtual actor.
+ ///
+ /// The default implementation creates the name based on the type name of
+ /// the actor.
+ ///
+ /// # Notes
+ ///
+ /// This uses [`type_name`] under the hood which does not have a stable
+ /// output. Like the `type_name` function the default implementation is
+ /// provided on a best effort basis.
+ ///
+ /// [`type_name`]: std::any::type_name
+ fn name(&self) -> &'static str {
+ name::<Self::Actor>()
+ }
+}
+
+impl<RT, M, A> VirtualActor for fn(rt: RT, msg: M) -> A
+where
+ A: Actor,
+{
+ type Message = M;
+ type Actor = A;
+ type Error = !;
+ type RuntimeAccess = RT;
+
+ fn new(
+ &mut self,
+ rt: Self::RuntimeAccess,
+ msg: Self::Message,
+ ) -> Result<Self::Actor, Self::Error> {
+ Ok((self)(rt, msg))
+ }
+}
Any progress on virtual actors? I like its design concept very much
Any progress on virtual actors? I like its design concept very much
I'm afraid not.