riker icon indicating copy to clipboard operation
riker copied to clipboard

Spawning a future inside Actor::receive

Open alanhoff opened this issue 5 years ago • 7 comments

Hello there, I'm trying to spawn futures inside my actor but I'm not sure how to get access to the inner ThreadPool. This is as far as I got:

#![feature(async_await, await_macro, futures_api, rustc_private)]
extern crate riker;
#[macro_use]
extern crate log;

use futures::executor::block_on;
use riker::actors::*;
use riker_default::*;
use riker_patterns::ask::ask;

struct MyActor;

impl Actor for MyActor {
    type Msg = String;

    fn receive(
        &mut self,
        ctx: &Context<Self::Msg>,
        msg: Self::Msg,
        sender: Option<ActorRef<Self::Msg>>,
    ) {
        let myself = ctx.myself();
        let sender = sender.unwrap();

        ctx.execute(async move {
          debug!("Got message {:?}", msg);
          sender.tell("This is your answer", Some(myself));
        });
    }
}

impl MyActor {
    fn actor() -> BoxActor<String> {
        Box::new(MyActor)
    }

    fn props() -> BoxActorProd<String> {
        Props::new(Box::new(MyActor::actor))
    }
}

// start the system and create an actor
fn main() {
    let model: DefaultModel<String> = DefaultModel::new();
    let sys = ActorSystem::new(&model).unwrap();

    let props = MyActor::props();
    let actor = sys.actor_of(props, "my-actor").unwrap();

    let res = ask(&sys, &actor, "Hello world");
    debug!("Got response {:?}", block_on(res));
}

I understand that Context::execute returns a RemoteHandle but I'm not sure where I should spawn it.

alanhoff avatar Apr 04 '19 21:04 alanhoff

Yeah I'm stuck on this too. Did you make any progress?

pietgeursen avatar May 07 '19 21:05 pietgeursen

With your example, I thought that maybe we could return the handle in the reply message and then block on that too. But now I'm stuck and have to stop. Here's what I tried:

#![feature(async_await, await_macro, futures_api, rustc_private)]
extern crate riker;
#[macro_use]
extern crate log;

use futures::future::{Future, RemoteHandle };
use futures::executor::block_on;
use riker::actors::*;
use riker_default::*;
use riker_patterns::ask::ask;

struct MyActor;

#[derive(Debug, Clone)]
enum Protocol{
    Hello(String),
    FutureMsg(RemoteHandle<String>) // RemoteHandle isn't cloneable.
}

impl Into<ActorMsg<Protocol>> for Protocol {
    fn into(self) -> ActorMsg<Protocol> {
        ActorMsg::User(self)
    }
}

impl Actor for MyActor {
    type Msg = Protocol;

    fn receive(
        &mut self,
        ctx: &Context<Self::Msg>,
        msg: Self::Msg,
        sender: Option<ActorRef<Self::Msg>>,
    ) {

        if let Protocol::Hello(inner) = msg {

            let myself = ctx.myself();
            let sender = sender.unwrap();

            let handle = ctx.execute(async move {
                debug!("Got message {:?}", inner);
                inner
            });
            let reply = Protocol::FutureMsg(handle);
            sender.tell(reply, Some(myself));
        };
    }
}

impl MyActor {
    fn actor() -> BoxActor<Protocol> {
        Box::new(MyActor)
    }

    fn props() -> BoxActorProd<Protocol> {
        Props::new(Box::new(MyActor::actor))
    }
}

// start the system and create an actor
fn main() {
    let model: DefaultModel<Protocol> = DefaultModel::new();
    let sys = ActorSystem::new(&model).unwrap();

    let props = MyActor::props();
    let actor = sys.actor_of(props, "my-actor").unwrap();

    let res = ask(&sys, &actor, Protocol::Hello("Hello world".to_string()));
    debug!("Got response {:?}", block_on(res));
}

But:

the trait bound `futures_util::future::remote_handle::RemoteHandle<std::string::String>: std::clone::Clone` is not satisfied

pietgeursen avatar May 07 '19 23:05 pietgeursen

If the RemoteHandle is dropped, the Remote future will be canceled. I think we should not use remote_handle in the implementation of ctx.execute.

harryfei avatar May 31 '19 04:05 harryfei

Hi @alanhoff, @pietgeursen and @harryfei,

This issue has been open for a while now and just to let you know Riker 0.3 was recently released. The ask in riker-patterns is being update now to use version 0.3. Once that's done I'll answer the question on how to execute futures.

leenozara avatar Jul 16 '19 13:07 leenozara

@pietgeursen have you tried https://riker.rs/futures/ ?

We are using combination of tokio 0.2 and riker in tezos-rs and it works really well together.

mrjoe7 avatar Sep 05 '19 15:09 mrjoe7

Context.run allows spawning a future, but I have no access to the self and Context inside the future (it requires 'static lifetime and &self cannot be borrowed like this). Is there any workaround?

s-panferov avatar Sep 08 '19 04:09 s-panferov

If you need to call &self from inside of async block then I would suggest to use one of the following options:

  1. if you need to modify something in Self then wrap it into Arc<Mutex<..>> and move it into the async block.

  2. pass ctx.myself() into async block and then call myself.tell(..) and pass data as a part of the message.

There is a reason you cannot call &self from the inside of async block - future can be moved between threads in executor and &self is not guarenteed to live long enough. That is the reason rust compiler will not allow you to access it.

mrjoe7 avatar Sep 09 '19 08:09 mrjoe7