Call with async server
#[web::get("/hello")]
async fn hello() -> impl web::Responder {
What is the best way to run the scripts?
I only got it with the
let worker = DefaultWorker::new(DefaultWorkerOptions {
default_entrypoint: None,
timeout: Duration::from_millis(1000),
startup_snapshot: Some(SNAPSHOT),
shared_array_buffer_store: None,
}).unwrap();
Can it be trait (Clone + Send) to pass to the server?
let server = web::server(move || {
App::new()
.state(worker.clone())
Thanks
Since DefaultWorker is already send, putting the worker in an Rc or Arc, could give you clone
Plus refcell if you need mutability
A Tokyo version for the channels would be good in my case.
impl<W> Worker<W> where W: InnerWorker, { /// Create a new worker instance /// /// # Errors /// Can fail if the runtime cannot be initialized (usually due to extension issues) pub fn new(options: W::RuntimeOptions) -> Result<Self, Error> { let (qtx, qrx) = channel(); let (rtx, rrx) = channel(); let (init_tx, init_rx) = channel::<Option<Error>>();
I could probably switch to Tokio channels
I'm open to a pr for it
I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app:
pub struct AsyncWorker {
handle: Option<thread::JoinHandle<()>>,
tx: Option<Sender<WorkerQuery>>,
rx: Receiver<WorkerResponse>,
}
impl AsyncWorker {
pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> {
let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024);
let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024);
let (init_tx, init_rx) = tokio::sync::oneshot::channel::<Option<ScriptingError>>();
let handle = thread::spawn(move || {
// .... very similar to the DefaultWorker
}
pub fn shutdown(&mut self) {
if let (Some(tx), Some(hnd)) = (self.tx.take(), self.handle.take()) {
drop(tx);
hnd.join().ok();
}
}
pub fn join(mut self) -> LabirootResult<()> {
self.shutdown();
match self.handle {
Some(hnd) => hnd
.join()
.map_err(|_| LabirootError::InternalError("Worker thread panicked".to_string())),
None => Ok(()),
}
}
pub async fn receive(&mut self) -> ScriptingResult<WorkerResponse> {
self.rx.recv().await.ok_or(ScriptingError::Worker(
"Error receiving from worker".to_string(),
))
}
pub async fn send(&self, query: WorkerQuery) -> ScriptingResult<()> {
match &self.tx {
None => {
// TODO this deserves an own enum error variant instead of using InternalError all over
return Err(ScriptingError::Worker("Worker has stopped".to_string()));
}
Some(tx) => tx,
}
.send(query)
.await
.map_err(|e| ScriptingError::Worker(format!("Error sending to worker: {}", e)))
}
pub async fn request_response(
&mut self,
query: WorkerQuery,
) -> ScriptingResult<WorkerResponse> {
self.send(query).await?;
let r = self.receive().await?;
debug!("Received response from worker: {:?}", r);
match r {
WorkerResponse::Error(e) => Err(e),
_ => Ok(r),
}
}
pub async fn call_function(
&mut self,
module_id: ModuleId,
name: String,
args: Vec<Value>,
) -> ScriptingResult<Value> {
self.request_response(WorkerQuery::Sync(SyncWorkerQuery::CallFunction(
module_id, name, args,
)))
.await?
.value()
}
pub async fn call_function_async(
&mut self,
module_id: ModuleId,
name: String,
args: Vec<Value>,
) -> ScriptingResult<AsyncComputationHandle> {
self.request_response(WorkerQuery::CallFunctionAsync(module_id, name, args))
.await?
.async_value()
}
}
I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app:
pub struct AsyncWorker { handle: Option<thread::JoinHandle<()>>, tx: Option<Sender<WorkerQuery>>, rx: Receiver<WorkerResponse>, } impl AsyncWorker { pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> { let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024); let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024); let (init_tx, init_rx) = tokio::sync::oneshot::channel::<Option<ScriptingError>>(); let handle = thread::spawn(move || { // .... very similar to the DefaultWorker } pub fn shutdown(&mut self) { if let (Some(tx), Some(hnd)) = (self.tx.take(), self.handle.take()) { drop(tx); hnd.join().ok(); } } pub fn join(mut self) -> LabirootResult<()> { self.shutdown(); match self.handle { Some(hnd) => hnd .join() .map_err(|_| LabirootError::InternalError("Worker thread panicked".to_string())), None => Ok(()), } } pub async fn receive(&mut self) -> ScriptingResult<WorkerResponse> { self.rx.recv().await.ok_or(ScriptingError::Worker( "Error receiving from worker".to_string(), )) } pub async fn send(&self, query: WorkerQuery) -> ScriptingResult<()> { match &self.tx { None => { // TODO this deserves an own enum error variant instead of using InternalError all over return Err(ScriptingError::Worker("Worker has stopped".to_string())); } Some(tx) => tx, } .send(query) .await .map_err(|e| ScriptingError::Worker(format!("Error sending to worker: {}", e))) } pub async fn request_response( &mut self, query: WorkerQuery, ) -> ScriptingResult<WorkerResponse> { self.send(query).await?; let r = self.receive().await?; debug!("Received response from worker: {:?}", r); match r { WorkerResponse::Error(e) => Err(e), _ => Ok(r), } } pub async fn call_function( &mut self, module_id: ModuleId, name: String, args: Vec<Value>, ) -> ScriptingResult<Value> { self.request_response(WorkerQuery::Sync(SyncWorkerQuery::CallFunction( module_id, name, args, ))) .await? .value() } pub async fn call_function_async( &mut self, module_id: ModuleId, name: String, args: Vec<Value>, ) -> ScriptingResult<AsyncComputationHandle> { self.request_response(WorkerQuery::CallFunctionAsync(module_id, name, args)) .await? .async_value() } }
Out of curiosity why did the built in worker not fit your use case?
I don't now if this helps, but I implemented a AsyncWorker to use it with my multithreaded main runtime in a bigger app: pub struct AsyncWorker { handle: Option<thread::JoinHandle<()>>, tx: Option<Sender<WorkerQuery>>, rx: Receiver<WorkerResponse>, }
impl AsyncWorker { pub async fn try_new(s: ScriptRuntimeState, webstorage_path: PathBuf) -> ScriptingResult<Self> { let (query_tx, query_rx) = tokio::sync::mpsc::channel(1024); let (resp_tx, resp_rx) = tokio::sync::mpsc::channel(1024);
Out of curiosity why did the built in worker not fit your use case?
I need a Tokio channel and async top-level signatures because the scripts I run typically take a long time to complete, blocking the worker wile waiting for a response, and I just don't wanted to go with run_blocking or somethinglike this.