rustyscript icon indicating copy to clipboard operation
rustyscript copied to clipboard

Call with async server

Open janderadutra opened this issue 11 months ago • 6 comments

#[web::get("/hello")]
async fn hello() -> impl web::Responder {

What is the best way to run the scripts?

I only got it with the

let worker = DefaultWorker::new(DefaultWorkerOptions {
        default_entrypoint: None,
        timeout: Duration::from_millis(1000),
        startup_snapshot: Some(SNAPSHOT),
        shared_array_buffer_store: None,
    }).unwrap();

Can it be trait (Clone + Send) to pass to the server?

let server = web::server(move || {
        App::new()
            .state(worker.clone())

Thanks

janderadutra avatar Jan 20 '25 00:01 janderadutra

Since DefaultWorker is already send, putting the worker in an Rc or Arc, could give you clone

Plus refcell if you need mutability

rscarson avatar Jan 20 '25 01:01 rscarson

A Tokyo version for the channels would be good in my case.

impl<W> Worker<W> where W: InnerWorker, { /// Create a new worker instance /// /// # Errors /// Can fail if the runtime cannot be initialized (usually due to extension issues) pub fn new(options: W::RuntimeOptions) -> Result<Self, Error> { let (qtx, qrx) = channel(); let (rtx, rrx) = channel(); let (init_tx, init_rx) = channel::<Option<Error>>();

janderadutra avatar Jan 20 '25 12:01 janderadutra

I could probably switch to Tokio channels

I'm open to a pr for it

rscarson avatar Jan 20 '25 13:01 rscarson

I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app:


pub struct AsyncWorker {
    handle: Option<thread::JoinHandle<()>>,
    tx: Option<Sender<WorkerQuery>>,
    rx: Receiver<WorkerResponse>,
}

impl AsyncWorker {
    pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> {
        let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024);
        let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024);

        let (init_tx, init_rx) = tokio::sync::oneshot::channel::<Option<ScriptingError>>();

        let handle = thread::spawn(move || {
          // .... very similar to the DefaultWorker
    }

    pub fn shutdown(&mut self) {
        if let (Some(tx), Some(hnd)) = (self.tx.take(), self.handle.take()) {
            drop(tx);
            hnd.join().ok();
        }
    }

    pub fn join(mut self) -> LabirootResult<()> {
        self.shutdown();
        match self.handle {
            Some(hnd) => hnd
                .join()
                .map_err(|_| LabirootError::InternalError("Worker thread panicked".to_string())),
            None => Ok(()),
        }
    }



    pub async fn receive(&mut self) -> ScriptingResult<WorkerResponse> {
        self.rx.recv().await.ok_or(ScriptingError::Worker(
            "Error receiving from worker".to_string(),
        ))
    }

    pub async fn send(&self, query: WorkerQuery) -> ScriptingResult<()> {
        match &self.tx {
            None => {
                // TODO this deserves an own enum error variant instead of using InternalError all over
                return Err(ScriptingError::Worker("Worker has stopped".to_string()));
            }
            Some(tx) => tx,
        }
        .send(query)
        .await
        .map_err(|e| ScriptingError::Worker(format!("Error sending to worker: {}", e)))
    }

    pub async fn request_response(
        &mut self,
        query: WorkerQuery,
    ) -> ScriptingResult<WorkerResponse> {
        self.send(query).await?;
        let r = self.receive().await?;
        debug!("Received response from worker: {:?}", r);
        match r {
            WorkerResponse::Error(e) => Err(e),
            _ => Ok(r),
        }
    }



    pub async fn call_function(
        &mut self,
        module_id: ModuleId,
        name: String,
        args: Vec<Value>,
    ) -> ScriptingResult<Value> {
        self.request_response(WorkerQuery::Sync(SyncWorkerQuery::CallFunction(
            module_id, name, args,
        )))
        .await?
        .value()
    }

    pub async fn call_function_async(
        &mut self,
        module_id: ModuleId,
        name: String,
        args: Vec<Value>,
    ) -> ScriptingResult<AsyncComputationHandle> {
        self.request_response(WorkerQuery::CallFunctionAsync(module_id, name, args))
            .await?
            .async_value()
    }

}

Darkskald avatar Feb 10 '25 14:02 Darkskald

I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app:


pub struct AsyncWorker {
    handle: Option<thread::JoinHandle<()>>,
    tx: Option<Sender<WorkerQuery>>,
    rx: Receiver<WorkerResponse>,
}

impl AsyncWorker {
    pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> {
        let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024);
        let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024);

        let (init_tx, init_rx) = tokio::sync::oneshot::channel::<Option<ScriptingError>>();

        let handle = thread::spawn(move || {
          // .... very similar to the DefaultWorker
    }

    pub fn shutdown(&mut self) {
        if let (Some(tx), Some(hnd)) = (self.tx.take(), self.handle.take()) {
            drop(tx);
            hnd.join().ok();
        }
    }

    pub fn join(mut self) -> LabirootResult<()> {
        self.shutdown();
        match self.handle {
            Some(hnd) => hnd
                .join()
                .map_err(|_| LabirootError::InternalError("Worker thread panicked".to_string())),
            None => Ok(()),
        }
    }



    pub async fn receive(&mut self) -> ScriptingResult<WorkerResponse> {
        self.rx.recv().await.ok_or(ScriptingError::Worker(
            "Error receiving from worker".to_string(),
        ))
    }

    pub async fn send(&self, query: WorkerQuery) -> ScriptingResult<()> {
        match &self.tx {
            None => {
                // TODO this deserves an own enum error variant instead of using InternalError all over
                return Err(ScriptingError::Worker("Worker has stopped".to_string()));
            }
            Some(tx) => tx,
        }
        .send(query)
        .await
        .map_err(|e| ScriptingError::Worker(format!("Error sending to worker: {}", e)))
    }

    pub async fn request_response(
        &mut self,
        query: WorkerQuery,
    ) -> ScriptingResult<WorkerResponse> {
        self.send(query).await?;
        let r = self.receive().await?;
        debug!("Received response from worker: {:?}", r);
        match r {
            WorkerResponse::Error(e) => Err(e),
            _ => Ok(r),
        }
    }



    pub async fn call_function(
        &mut self,
        module_id: ModuleId,
        name: String,
        args: Vec<Value>,
    ) -> ScriptingResult<Value> {
        self.request_response(WorkerQuery::Sync(SyncWorkerQuery::CallFunction(
            module_id, name, args,
        )))
        .await?
        .value()
    }

    pub async fn call_function_async(
        &mut self,
        module_id: ModuleId,
        name: String,
        args: Vec<Value>,
    ) -> ScriptingResult<AsyncComputationHandle> {
        self.request_response(WorkerQuery::CallFunctionAsync(module_id, name, args))
            .await?
            .async_value()
    }

}

Out of curiosity why did the built in worker not fit your use case?

rscarson avatar Feb 10 '25 22:02 rscarson

I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app: pub struct AsyncWorker { handle: Option<thread::JoinHandle<()>>, tx: Option<Sender<WorkerQuery>>, rx: Receiver<WorkerResponse>, }

impl AsyncWorker { pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> { let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024); let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024);

Out of curiosity why did the built in worker not fit your use case?

I need a Tokio channel and async top-level signatures because the scripts I run typically take a long time to complete, blocking the worker wile waiting for a response, and I just don't wanted to go with run_blocking or somethinglike this.

Darkskald avatar Feb 13 '25 08:02 Darkskald