crossbeam
crossbeam copied to clipboard
Add the ability to send/recv to/from the OS using polling, `WouldBlock` sys-calls
instead of:
std::thread::spawn(move || {
// wait until condition
stopper.send(());
});
std::thread::spawn(move || {
let conn = listener.accept()?;
conn_tx.send(conn);
});
loop { select! {
recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
recv(conn_rx) -> conn => handle_connection(conn),
}}
This one, you must spin up a dedicated loop that can block on a listen, but how about:
std::thread::spawn(move || {
// wait until condition
stopper.send(());
});
let listener = TcpListener::bind("localhost:0").unwrap();
listener.set_nonblocking(true).unwrap();
loop { select! {
recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
os_recv(listener.accept()) -> conn => handle_connection(conn),
}}
Because of the selectors limitation to be only able to work on channels, we had to instead write a helper method like so. Here is an MVP:
use crossbeam::{channel::Receiver, utils::Backoff};
use std::{
net::{SocketAddr, TcpListener, TcpStream},
time::Duration,
};
fn main() {
let mut listener = TcpListener::bind("localhost:8080").unwrap();
listener.set_nonblocking(true).unwrap();
let (stop_tx, mut stop_rx) = crossbeam::channel::bounded::<()>(2);
let sleep_stopper = std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(20));
dbg!("awake");
stop_tx.send(()).unwrap();
});
loop {
if let Err(_) = dbg!(os_friendly_select(&mut stop_rx, &mut listener)) {
break;
}
}
sleep_stopper.join().unwrap();
}
fn os_friendly_select(
stop_rx: &mut Receiver<()>,
listener: &mut TcpListener,
) -> Result<(TcpStream, SocketAddr), ()> {
loop {
let backoff = Backoff::new();
loop {
if stop_rx.try_recv().is_ok() {
dbg!("returning because stopped");
return Err(());
}
let try_accept = listener.accept();
match try_accept {
Ok(good_conn) => {
return Ok(good_conn);
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
dbg!("zzz");
backoff.snooze();
} else {
panic!("try_accept got a bad error");
}
}
}
}
}
}
Part of the contract would be that anything that calls the OS would have to have the std::io::ErrorKind::WouldBlock in its spec (this is assuming linux)
I would be happy to add this functionality, as it would be great to have in our production code-base.
Why not use async? (It should already have the proper mechanisms to handle such cases.)
if e.kind() == std::io::ErrorKind::WouldBlock { dbg!("zzz"); backoff.snooze();
It does not seem desirable to do an unbounded spin loop if OS returns WouldBlock.
That's what we have been using, but we have realised that async in our code-base was a mistake, and we are migrating away from it.
What we ended up doing, for now, is this:
pub struct BootstrapTcpListener {
poll: Poll,
events: Events,
server: TcpListener,
// HACK : create variable to move ownership of mio_server to the thread
// if mio_server is not moved, poll does not receive any event from listener
_mio_server: MioTcpListener,
}
pub enum PollEvent {
NewConnection((TcpStream, SocketAddr)),
Stop,
}
impl BSEventPoller for BootstrapTcpListener {
fn poll(&mut self) -> Result<PollEvent, BootstrapError> {
self.poll.poll(&mut self.events, None).unwrap();
// Confirm that we are not being signalled to shut down
if self.events.iter().any(|ev| ev.token() == STOP_LISTENER) {
return Ok(PollEvent::Stop);
}
// Ther could be more than one connection ready, but we want to re-check for the stop
// signal after processing each connection.
return Ok(PollEvent::NewConnection(
self.server.accept().map_err(BootstrapError::from)?,
));
}
}
impl BootstrapListenerStopHandle {
/// Stop the bootstrap listener.
pub fn stop(self) -> Result<(), BootstrapError> {
self.0.wake().map_err(BootstrapError::from)
}
}
...and in the main event loop of the server, instead of using std::net::TcpListener::accept(...)?, we use our poll and match on the return enum.
Previously we would have a dedicated listener thread, and two crossbeam channels, one for a connection, one for a stop-signal broadcast:
loop {
let conn = select! {
recv(stopper_rx) -> _ => return Ok(()),
recv(conn_rx) -> conn => conn,
};
// ...validate connection before dispatching to a system-thread to handle
}
What the change meant is that we no longer have a need for a dedicated listener thread, or a buffer of connections.
I see that my "MVP" is probably a bad label, and probably a distraction.