nuclei
nuclei copied to clipboard
normal channel async message passing stops after I start up the tcp server
Here is the shortest example I could do. If we comment out the TODO: line the message passing works fine but with the tcp server in place we can hit it with the browser to get different results yet the message passing on the same local thread stops. Async should continue working on this thread so I am not sure where it is blocking.
use nuclei::; use std::net::TcpListener; use std::time::Duration; use anyhow::Result; use async_dup::Arc; use futures::prelude::; use futures_timer::Delay; use http_types::{Request, Response, StatusCode};
/// Serves a request and returns a response. async fn serve(req: Request) -> http_types::Result<Response> { let mut res = Response::new(StatusCode::Ok); res.insert_header("Content-Type", "text/plain"); let now = format!("{:?}", std::time::SystemTime::now()); //Serving the time res.set_body(now); Ok(res) }
/// Listens for incoming connections and serves them. async fn listen(listener: Handle<TcpListener>) -> Result<()> { // Format the full host address. let host = format!("http://{}", listener.get_ref().local_addr()?); println!("Listening on {}", host);
loop {
// Accept the next connection.
let (stream, _) = listener.accept().await?;
println!("Accepted connection");
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
nuclei::block_on(spawn_local(async move {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
})); //we do not detach span_local instead we block on it.
}
}
#[nuclei::main] async fn main() -> Result<()> { //nuclei::drive( async {run_now(); } ); run_now(); Ok(()) }
fn run_now() -> Result<()> {
//create an async channel
let (tx, rx) = async_channel::bounded::<String>(10);
let t = async move {
let mut count = 0;
loop {
tx.send("Hello".to_string()).await;
count += 1;
println!("Sent {} messages", count);
Delay::new(Duration::from_secs(1)).await;
}
};
nuclei::spawn_local(t).detach();
let r = async move {
let mut count = 0;
loop {
rx.recv().await.unwrap();
count += 1;
println!("Received {} messages", count);
}
};
nuclei::spawn_local(r).detach();
let http = listen(Handle::<TcpListener>::bind("0.0.0.0:8001")?);
//TODO: NOTE: this line enables the server but causes the above message passing to stop.
nuclei::spawn_local(http).detach(); //this prevents all other async from working.
//only run our test for 400 seconds
nuclei::block_on(
nuclei::spawn_local(
Delay::new(Duration::from_secs(400))
));
Ok(())
}