Use of actor reference on Async Actor handle
I'm trying implement some asynchronous actor response that uses the Actor's self reference on actix v0.9 but I can't get it to work on the account of some lifetime bounds error. I've separated a small example that contains the error:
use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "Result<usize, ()>")]
struct Msg;
struct MyActor;
impl Actor for MyActor {
type Context = Context<Self>;
}
impl Handler<Msg> for MyActor {
type Result = ResponseActFuture<Self, Result<usize, ()>>;
fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
Box::new(self.reply().into_actor(self))
}
}
impl MyActor {
async fn reply(&self) -> Result<usize, ()> {
Ok(42)
}
}
But this gives me the following error:
error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
--> src/main.rs:17:23
|
17 | Box::new(self.reply().into_actor(self))
| ^^^^^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 16:5...
--> src/main.rs:16:5
|
16 | / fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
17 | | Box::new(self.reply().into_actor(self))
18 | | }
| |_____^
note: ...so that reference does not outlive borrowed content
--> src/main.rs:17:18
|
17 | Box::new(self.reply().into_actor(self))
| ^^^^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
--> src/main.rs:17:9
|
17 | Box::new(self.reply().into_actor(self))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= note: expected `std::boxed::Box<(dyn actix::fut::ActorFuture<Actor = MyActor, Output = std::result::Result<usize, ()>> + 'static)>`
found `std::boxed::Box<dyn actix::fut::ActorFuture<Actor = MyActor, Output = std::result::Result<usize, ()>>>`
I've tried to implement this using the ActorFuture and the them combinator but I get similar errors, this is what I tried:
fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
Box::new(
async {}
.into_actor(self)
.then(|_, act, _| actix::fut::wrap_future(act.reply())),
)
}
The problem seams to be that I can only have static references for the future I return, but since the future will be handled by the Actor itself shouldn't it be able to use it's own reference?
Are there examples for asynchronously handling messages? I have only been able to find the doc example on ActorFuture, witch is even a little bit outdated; but it's a bit of a different context since that doesn't use a reference to the actor on the futures it chains.
I think this is happening because there are no explicit lifetime constraints between the actor and the future, so it automatically gets a 'static one. On the other hand, the async fn(&self) automatically gets a shorter lifetime bound to self based on its signature.
I'm not sure if the former can be fixed by tweaking actix types with explicit lifetimes.
The latter however can be adjusted by using an explicit impl Future instead of async fn. For example:
struct MyActor {
label: String,
}
impl MyActor {
fn reply(&self) -> impl Future<Output = Result<String, ()>> /* + 'static */ {
let label = self.label.clone();
async { Ok(label) }
}
}
I think this is happening because there are no explicit lifetime constraints between the actor and the future
Yes, this is what I have been able to understand from compiler messages and experimenting.
I'm not sure if the former can be fixed by tweaking actix types with explicit lifetimes.
I belve ActorFuture, or maybe WrapFuture, should have some sort of lifetime parameter to indicate that the Future can have a lifetime that associated to the Actor and it's Context, this way I belive it would be possible to use methods on actors without the need of cloning. I have been trying to tweak definitions on actix::fut to indicate such lifetime parameters but without any real success so far.
I have have been doing so far in the project I'm using Actix 0.9 is to clone the actor itself and wrap the method call. Since all my actors can be cloned and I currently don't need to update internal state itself based on the message handles this is valid approach for this use case. The code goes something to the likes of:
impl Handler<Msg> for MyActor {
type Result = ResponseActFuture<Self, Result<()>>;
fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
let this = self.clone();
Box::new(actix::fut::wrap_future(this.async_handle_msg(msg)))
}
}
impl MyActor {
async fn async_handle_msg(self, msg: Msg) -> Result<()> {
todo!("Impl some handling here")
}
}
The core problem that needs to be addressed is that while the future waits, other messages can be sent to the actor. So if they actor is currently borrowed by an async block or by a Future returned from async function, these other messages cannot mutably borrow the actor.
ActorFuture solved this by not keeping the future borrowed while the future waits - it gets reborrowed every time the future is awoken. Doing something like this with async-await is far from trivial...
I don't think it possible. As @idanarye pointed out, the use of actor reference on async asctor handle leads to multiple mutable reference to actor, which is never allowed in design.
I want to add that the lifetime of a future is static and the reference to self is never a static variable. What this lifetime really means is that a future have to take ownership of all variables inside a async block. This limitation cause the annoying pattern that all variable have to be clone outside a futures block in the futures 0.1 days. So, when a variable is used in multiple map function, we have to clone this variable for every map function.
//futures 0.1
let non_copy_var = String::from("some text");
let non_copy_var1 = non_copy_var.clone();
let non_copy_var2 = non_copy_var.clone();
futures::future::ok(())
.map(move |_| {
println!("First usage of non_copy_var: {}", non_copy_var1);
()
})
.map(move |_| {
println!("Second usage of non_copy_var: {}", non_copy_var2);
()
});
In futures 0.3 today, this problem is alleviated that we only need to move variable once. But we still need to move variable into async code block, which can never happened when using self reference.
For this reason, I don't think it possible to use mutable self reference in an async block. But you can move the required information into async code block and at last update data using act in the map function of FutureWrap(fut.into_actor(self).map(move |res, act, ctx| {})).
The following code have passed the rust compiler.
use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "Result<usize, ()>")]
struct Msg;
struct MyActor(usize);
impl Actor for MyActor {
type Context = Context<Self>;
}
impl Handler<Msg> for MyActor {
type Result = ResponseActFuture<Self, Result<usize, ()>>;
fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
let result = self.reply();
Box::new(
async move { result }
.into_actor(self)
.map(move |res, act, ctx| {
// update self this way
act.0 = res.unwrap();
Ok(act.0)
}),
)
}
}
impl MyActor {
fn reply(&mut self) -> Result<usize, ()> {
Ok(42)
}
}
Would it be an option to try to implement that all actors, on creation, are an Arc<Mut<T>> to the actor? So that we could have a handle function that passes the locked actor, and an async handle that passes the Arc<Mut<T>>? We would still need to find a way to prevent double lock on the same thread (probably not detected by the compiler)... And I don't know how big a hit in performance this would be.
Would it be an option to try to implement that all actors, on creation, are an Arc<Mut<T>> to the actor? So that we could have a handle function that passes the locked actor, and an async handle that passes the Arc<Mut<T>>?
The system itself already ensures that only one message will be handled our polled at a time. What we need is a way to convince compiler of that.
Meanwhile another workaround I've adopted to be able to update internal state without having to clone or mess old code too much was to use a raw (unsafe ) pointer deref to access it's async methods. So I have something like this:
impl Handler<Msg> for MyActor{
type Result = ResponseFuture<Response>;
fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
let this: *mut MyActor = &mut self;
Box::pin(async move {
unsafe {
let response = (*this).handle(msg).await;
}
})
}
}
I think a raw pointer dereference can cause data race.
Consider two long and complicate async method that edit fields of the same actor.
use actix::clock::delay_for;
use std::time::Duration;
struct Foo(usize);
impl Foo{
async fn handle(&mut self) -> Result<usize, ()> {
self.0 = 30;
if self.0>0{
delay_for(Duration::from_secs(10));
self.0 -= 1;
}
Ok(self.0)
}
}
When two handle is running, it is certainly a buggy disaster.
@guoli-lyu yes, this could indeed lead to data race. But other than this, and cloning, I don't see any other way for one to use actors internal state on a async message handling.
Also worth pointing out that the data race is not caused by the unsafe deref itself, considere the following handle:
impl Handler<Msg> for Foo {
type Result = ResponseActFuture<Self, Result<usize, ()>>;
fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
println!("Received");
self.0 = 30;
Box::new(
async {}
.into_actor(self)
.then(|_, this: &mut Self, _| delay_for(Duration::from_secs(10)).into_actor(this))
.map(|_, this, _| {
this.0 -= 1;
Ok(this.0)
}),
)
}
}
If Msg is sent twice to Foo, the response could be 28 instead of the expected 29. This happens because the system might handle the second message while the first has not yet resolved.
As Actix's RunTime is single threaded, the write operations on the actor's internal state should not lead to any memory hazard, but it still lead to some unexpected behavior if one is not careful.
Maybe it's worthwhile to add this possible data race in documentation when modification on the actor's internal state in async code.
Well, this data race seems irrelavant to actor reference on async actor handle. My point is, the handle function in Handler is synchronous code, while async actor handle is async. Moving from sync code to async requires move semantic for all used variable, including self.
What about rewrite handle() function in Handler<T> trait to an async function? In this way, there is no need to switch between sync and async code block. Though I don't know if this is possible.
One option (based on a now-deleted Reddit comment) is to do something like this:
fn handle(&mut self, msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
Box::new(move async {
let first_step_result = do_something_with_msg(msg).await?;
let mid_result_with_actor = ctx.with_actor(|this, _ctx) {
this.do_some_computation_with_(first_step_result)
}).await?;
final_computation(mi_result_with_actor).await
})
}
The idea is that ctx.with_actor() gets a closure that gets the actor data and context and runs a synchronous block with them. Because it's synchronous, it doesn't have to keep the borrowed actor data and context while awaiting - because it does not await.
ctx.with_actor() itself returns a future which gets awaken when the Actix runtime decides the actor can be awoken - which I assume, since we are single threaded, happens immediately? So maybe it can even be optimized away?
About the with_actor method
The ctx.with_actoridea looks ok to use, but I don't know how viable it is to implement to the current system. Context by itself wouldn't be able to execute the provided future because it doesn't hold a actor's reference, and sending it as a future to the be polled later wouldn't allow us to get a result out of it on with_actor returns. If with_actor would return just (), then I think it could be implemented as a method on Context to the likes of:
pub fn with_actor<F>(&mut self, f: F)
where
F: Fn(&mut A, &mut Self) + 'static,
{
self.spawn(Box::new(
crate::fut::wrap_future(async {}).map(move |_, act, ctx| f(act, ctx)),
));
}
I've put it on a fork: actix/topic/with_actor.
About the race condition
On a second note, now about the race condition. The problem happens because the system might pool on other futures while some computation on the ResponseActFuture is pending. What if we enforce exclusivity when handling a ResponseActFuture? I think this could be achieved by instead of having it spawned into the context, have it sent as wait. So on MessageResponse implementation for ResponseActFuture we would have:
fn handle<R: ResponseChannel<M>>(self, ctx: &mut A::Context, tx: Option<R>) {
ctx.wait(self.then(move |res, this, _| {
if let Some(tx) = tx {
tx.send(res);
}
async {}.into_actor(this)
}));
}
I have have pushed such modification to another branch on my fork: actix/topic/avoid_handle_data_race. I'm not sure this doesn't cause any block on the system, but I do intend to test it further when I have some more time.
I just noticed that making with_actor a method of Context would mean we need to move the context to the async block, but ctx is also mutably borrowed... Maybe it can be a method of the address, or some other object that's constructed from the context but does not depend on it? Maybe even a free function or a static method, and always borrow the actor of the future that awaits on it?
Spawning it is not very helpful, not (just) because we can't get a result but because we cannot await for it to complete - so we can't use it to control the flow.
Can't we make a future out of it? I've managed to get this to work (see the full code in https://gist.github.com/idanarye/46cf23613d1a789c7ebf219a5225e86a)
fn handle(&mut self, _msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
let addr = ctx.address();
let original_number = self.num;
Box::new(async move {
let current_number = addr.send(with_actor(|this: &mut Self, _| {
this.num
})).await.unwrap().unwrap();
println!("Current number is {}", current_number);
assert_eq!(current_number, original_number);
addr.send(Increment).await.unwrap();
addr.send(Increment).await.unwrap();
addr.send(Increment).await.unwrap();
let current_number = addr.send(Increment).await.unwrap();
println!("Number incremented to {}", current_number);
assert_eq!(current_number, original_number + 4);
{
let original_number = original_number;
addr.send(with_actor(move |this: &mut Self, _| {
this.num *= 2;
println!("Multiplying by 2 to {}", this.num);
assert_eq!(this.num, (original_number + 4) * 2);
})).await.unwrap().unwrap();
}
let current_number = addr.send(Increment).await.unwrap();
println!("Number incremented to {}", current_number);
assert_eq!(current_number, (original_number + 4) * 2 + 1);
Ok(())
}.into_actor(self))
}
(I tried to make with_actor accept the addr, but couldn't get it to build. This is just a PoC anyway)
with_actor is a helper function to create an instance of WithHelper - a message that contains an FnOnce that borrows the actor data and context and returns some value. I made my actor handle it by invoking the function and returning it's result as the message result.
Now I just have to move an Addr into the async block, and whenver I need to access the actor's state I just send it a WithHelper with a function that modifies it and/or reads data from it, and await the result of that function.
Ugly, and probably not very efficient, but works and can maybe give you (or someone else who's familiar with the internals of Actix) some inspiration for a better way to do this?
Hey everyone, just wanted to chime in here. Obviously, there has already been a lot of discussion along these lines, but I think something which actix needs right now is the following signature for message handlers:
impl Handler<Msg> for Foo {
type Result = ...;
async fn handle(&mut self, _: Msg, ...) -> Self::Result {
//..
}
}
The major change here is the async fn handle. Now, obviously there has been a lot of discussion above about concurrent (not parallel) mutations of the actor state when a yield point is hit. TO BE SURE this is already the case in actix. If you have an actor which handles multiple concurrent messages, and the handling logic is something like the following:
1. mutate internal state
2. yield (due to network request or the like)
3. use state again
in such cases, you stand to run into problems. In other contexts, this would be referred to as a "critical section", and exclusion is guaranteed by the actor ... as long as you don't yield in that critical section.
So, I don't think we should attempt to "re-solve" that problem. It is the case today, and it needs to be in the future as well, because that is what enables concurrent processing in general.
So, all in all, if we can migrate the actix code base to embrace async/await at its core, and just help to teach users about concurrency and the fact that concurrent (not parallel) mutation is possible, and needs to be in order to support high-throughput, then we should be in good shape. We can teach users how to craft concurrent algorithms which embrace these facts and how to work and succeed within these constraints.
As an example, I wrote the actix-raft crate using acitx, and there are plenty of cases where concurrent mutation takes place in the various actors, and these conditions need to be checked and accounted for.
I know there are complexities with the context system and other bits like that. However, actix haveing async/await support at its core would be such a game changer! Worth some design discussion and such, IMHO. Thoughts?
Cheers!
Essentially, the ActorFuture was a way to be able to gain access to the actor from within a futures chain. Before async/await was around, that was difficult. Now that we have async/await and futures can hold references, we get that easily. What are the other things which ActorFuture is even needed for?
Now that we have async/await and futures can hold references
Futures' ability to hold references was not changed - they could hold a reference before, and it still counts as a borrow: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=89a6b843b87d04ca269f713f47184b6d
Also - Rust doesn't have async trait methods yet, and creates like async-trait are merely rewriting them to return a pinned async block.
@idanarye re async-trait: true, true. I'm wondering if that would be viable for now.
As far as references, I should have been more clear with what I said. I was more referring to the limitations around #[await] back in the day. Building the state machine with & or &mut was not allowed back in the day before pinning was around. You had to use smart pointers.
With async/await of today (thanks Pin) we can use & and &mut in async fn declarations and the like. And in this case that directly applies to the &mut self of the actor in the async fn handle(..) mentioned above.
@thedodd #[await] was never stable and stable actix was using callbacks - which do support passing references in the closure. ActorFuture was still necessary because:
- The closure - and therefore the future that contains it - would then borrow the actor, and it would not be usable again until the future is resolved.
- If you need to chain callbacks, only one of them can borrow the actor.
The second problem does get solved by using async - though one could argue it could be solved by nesting the callbacks instead of chaining them. The first problem is not solved by async, and would not be solved even if we had async traits.
#[await]was never stable and stable
Hmm, I never said it was. My point has clearly been missed, but that's ok. Your response about ActorFuture is pertinent, so I'll focus on that.
I agree with your statements about 2.. As far as 1., I'll put together a gist of my thoughts for further discussion.
@idanarye what if we use a signature like the following instead (&self instead of &mut self) :
impl Handler<Msg> for Foo {
type Result = ...;
async fn handle(&self, _: Msg, ...) -> Self::Result {
//..
}
}
This would mitigate issue 1. as discussed above. We gain ergonomics. Mutations would have to be done by way of interior mutability (tokio has a nice RwLock). I'm not too concerned about that last point because that is what one needs to do in other areas of the Actix stack. Eg, in actix-web, when setting up an HttpServer, if one needs to mutate some shared state, interior mutability needs to be used.
Actix itself isn't 1.0 yet, so perhaps this would be a big win if we are able to implement a pattern like this. Thoughts?
To be sure, in case it isn't clear from the above, doing so would allow us to remove the ActorFuture construct.
- every message handler would only take a shared ref to the actor (so any number of these can be leased out by the compiler).
- mutability would be shifted over to the responsibility of the caller via interior mutability. There is plenty of support code for this already available in the ecosystem (tokio::sync::RwLock &c).
- this may quite likely help to improve the overall throughput of the Actix system, as nothing requires exclusive refs to the actor, allowing for more tasks to be processed concurrently.
- should work beautifully with a new handler signature like
async fn handle(&self, ....
@Jonathas-Conceicao @lucab @fMeow @Malanche @JohnTitor (pinging folks here based on comments on this thread :))
Won't this kind of defeat the whole purpose of having an actor system?
Would it? I'm not sure. I'm definitely just spitballing on design ideas and such.
- state & functionality would still be encapsulated within each actor.
- functionality would still be driven via message passing.
- mutation would still be possible only from within the actor itself, as it is today. This would just require an added step of
let state = self.state.write().await?(or the like) before being able to mutate.
Perhaps the fact that mutations would require synchronization is a major deterrent. I'll have to think it over a bit more. However, the ActorFuture is in essence a synchronization primitive. It guarantees exclusive access to the actor. As such, it is semantically a Mutex (reading & writing is excluded while the borrow [the lock in this case] is held). So perhaps this is tantamount to the same thing, but actually an improvement because full mutual exclusion wouldn't be required; users could use a RwLocks, which would allow for concurrent execution of messages per actor, only syncing when interior mutability is required by the user.
Thoughts?
I too beilive that having a async method on the Handler trait would make things simpler, but I'm not sure how viable it would be now giving the current ecosystem.
Change the handle function to use a immutable reference seams like a step back to me. As @thedodd has said, ActorFuture does work like a synchronization primitive, removing it would require that users that do desire mutation in their Actor's internal state to implement synchronization themselves, and sharing state is one of the esses of the actor model.
Hi all! I think the idea what Jonathas proposed before - controlled multiple mutable reference to the actor and ctx - is the ideal solution. This will bring the same concurency modell to the Actor as node.js have or the ActorFutures already has with it's combinators. I think if I must choose between the node.js model and interior mutability/locking then I will preferr the first one. To implement deadlocks is such easy (at least for me) and Rust will not protect you from it. The futures 0.3 system has so weak combinators at Result/Future transition (compared to futures 0.1), and the async/await system with the ?/try! macro is so powerful, that the changeover to async/await can't be a question, only the directions.
I implemented an unsafe PoC, what has the following signatures:
#[async_trait(?Send)]
pub trait AsyncHandler<Msg>
where
Self: Actor,
Msg: Message,
{
type Result: MessageResponse<Self, Msg>;
async fn handle(&mut self, msg: Msg, ctx: &mut Self::Context) -> Self::Result;
}
#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
type Result = Result<()>;
async fn handle(&mut self, _msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
Ok(())
}
}
impl_handler!(MyMessage, MyActor);
@balsa02 What happens if I have something like this:
struct MyActor {
data: std::cell::HashMap<Foo, Bar>,
}
#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
type Result = Result<()>;
async fn handle(&mut self, msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
let mut bar = self.data.entry(&msg.foo).or_default();
*bar = bar.something_something().await?;
Ok(())
}
}
and I send two MyMessages to the same MyActor? Wouldn't it invalidate the hashmap?
The first message will create some entry, run something_something(), and await on it. Then the second message will create another entry - which can potentially cause a rehash. When the first message return from the await - it'll store the data in the wrong place, because it doesn't know there was a rehash. HashMap does not handle this scenario because its impossible to do this in safe Rust.
(I don't really know if HashMap works like that - but it's allowed to, and even if it doesn't surely many other things do rely on these safety guarantees)
Based on a quick look through the code, it looks like per actor:
- all
waitfutures are polled to completion first. - then the mailbox is drained.
- then any other non-wait futures are polled to completion, one at a time. If one of those futures spawns a new wait future then that will be driven to completion first.
So, @idanarye it doesn't look like there will be a problem in your above code example. Also, please correct me if I am wrong. My statement above is just based on tracing through the code based on sending a message and then following the process of execution, which boils down to some code in the contextimpl.rs module, and the impl<A, C> Future for ContextFut<A, C> bit.
[...] does not handle this scenario because its impossible to do this in safe Rust.
You can do this with one mutable reference too.
let mut bar1 = self.data.entry(&msg1.foo).or_default();
let mut bar2 = self.data.entry(&msg2.foo).or_default();
*bar1 = bar1.something_something();
If a struct would be vulnerable with 2 single threaded mutable reference, it would be with 1 too.