crossbeam icon indicating copy to clipboard operation
crossbeam copied to clipboard

Add the ability to send/recv to/from the OS using polling, `WouldBlock` sys-calls

Open Ben-PH opened this issue 2 years ago • 2 comments

instead of:

std::thread::spawn(move || {
   // wait until condition
   stopper.send(());
});

std::thread::spawn(move || {
    let conn = listener.accept()?;
    conn_tx.send(conn);
});


loop { select! {
    recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
    recv(conn_rx) -> conn => handle_connection(conn),
}}

This one, you must spin up a dedicated loop that can block on a listen, but how about:

std::thread::spawn(move || {
   // wait until condition
   stopper.send(());
});

let listener = TcpListener::bind("localhost:0").unwrap();
listener.set_nonblocking(true).unwrap();
loop { select! {
    recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
    os_recv(listener.accept()) -> conn => handle_connection(conn),
}}

Because of the selectors limitation to be only able to work on channels, we had to instead write a helper method like so. Here is an MVP:

use crossbeam::{channel::Receiver, utils::Backoff};
use std::{
    net::{SocketAddr, TcpListener, TcpStream},
    time::Duration,
};

fn main() {
    let mut listener = TcpListener::bind("localhost:8080").unwrap();
    listener.set_nonblocking(true).unwrap();
    let (stop_tx, mut stop_rx) = crossbeam::channel::bounded::<()>(2);
    let sleep_stopper = std::thread::spawn(move || {
        std::thread::sleep(Duration::from_secs(20));
        dbg!("awake");
        stop_tx.send(()).unwrap();
    });
    loop {
        if let Err(_) = dbg!(os_friendly_select(&mut stop_rx, &mut listener)) {
            break;
        }
    }
    sleep_stopper.join().unwrap();
}

fn os_friendly_select(
    stop_rx: &mut Receiver<()>,
    listener: &mut TcpListener,
) -> Result<(TcpStream, SocketAddr), ()> {
    loop {
        let backoff = Backoff::new();
        loop {
            if stop_rx.try_recv().is_ok() {
                dbg!("returning because stopped");
                return Err(());
            }

            let try_accept = listener.accept();
            match try_accept {
                Ok(good_conn) => {
                    return Ok(good_conn);
                }
                Err(e) => {
                    if e.kind() == std::io::ErrorKind::WouldBlock {
                        dbg!("zzz");
                        backoff.snooze();
                    } else {
                        panic!("try_accept got a bad error");
                    }
                }
            }
        }
    }
}

Part of the contract would be that anything that calls the OS would have to have the std::io::ErrorKind::WouldBlock in its spec (this is assuming linux)

I would be happy to add this functionality, as it would be great to have in our production code-base.

Ben-PH avatar Mar 01 '23 17:03 Ben-PH

Why not use async? (It should already have the proper mechanisms to handle such cases.)

if e.kind() == std::io::ErrorKind::WouldBlock {
                        dbg!("zzz");
                        backoff.snooze();

It does not seem desirable to do an unbounded spin loop if OS returns WouldBlock.

taiki-e avatar Apr 28 '23 09:04 taiki-e

That's what we have been using, but we have realised that async in our code-base was a mistake, and we are migrating away from it.

What we ended up doing, for now, is this:

pub struct BootstrapTcpListener {
    poll: Poll,
    events: Events,
    server: TcpListener,
    // HACK : create variable to move ownership of mio_server to the thread
    // if mio_server is not moved, poll does not receive any event from listener
    _mio_server: MioTcpListener,
}

pub enum PollEvent {
    NewConnection((TcpStream, SocketAddr)),
    Stop,
}

impl BSEventPoller for BootstrapTcpListener {
    fn poll(&mut self) -> Result<PollEvent, BootstrapError> {
        self.poll.poll(&mut self.events, None).unwrap();

        // Confirm that we are not being signalled to shut down
        if self.events.iter().any(|ev| ev.token() == STOP_LISTENER) {
            return Ok(PollEvent::Stop);
        }

        // Ther could be more than one connection ready, but we want to re-check for the stop
        // signal after processing each connection.
        return Ok(PollEvent::NewConnection(
            self.server.accept().map_err(BootstrapError::from)?,
        ));
    }
}

impl BootstrapListenerStopHandle {
    /// Stop the bootstrap listener.
    pub fn stop(self) -> Result<(), BootstrapError> {
        self.0.wake().map_err(BootstrapError::from)
    }
}

...and in the main event loop of the server, instead of using std::net::TcpListener::accept(...)?, we use our poll and match on the return enum.

Previously we would have a dedicated listener thread, and two crossbeam channels, one for a connection, one for a stop-signal broadcast:

loop {
    let conn = select! {
        recv(stopper_rx) -> _ => return Ok(()),
        recv(conn_rx) -> conn => conn,
    };
    // ...validate connection before dispatching to a system-thread to handle
}

What the change meant is that we no longer have a need for a dedicated listener thread, or a buffer of connections.

I see that my "MVP" is probably a bad label, and probably a distraction.

Ben-PH avatar Apr 28 '23 09:04 Ben-PH