interprocess icon indicating copy to clipboard operation
interprocess copied to clipboard

Cannot read & write at the same time with tokio::select!

Open tqwewe opened this issue 4 years ago • 4 comments
trafficstars

Describe the bug When using tokio::select! with one of the async expressions being conn.read(&mut buf), it will cause conn.write_all().await to await forever when called in another handler.

To Reproduce

let (tx, mut rx) = broadcast::channel::<int64>(16);
let tx_listener = tx.clone();

tokio::spawn(async move {
    let listener = LocalSocketListener::bind("/tmp/example.sock").await?;

    listener
        .incoming()
        .try_for_each(|mut conn| {
            println!("Client connected!");
            let mut rx = tx_listener.subscribe();
            let mut buf = Vec::new();

            async move {
                tokio::select! {
                    Ok(msg) = rx.recv() => {
                        println!("Received message from channel: {}", msg);
                        let result = conn.write_all(b"Test\n").await; // Gets stuck forever (or until client disconnects) due to `conn.read` branch blocking it
                        println!("Write all: {:?}", result);
                    },
                    Err(_) = conn.read(&mut buf) => {
                        println!("Client disconnected!");
                        return Ok(());
                    },
                }
            }
        }).await?;
});

tx.send(10);

Expected behavior The only ideas I can come up with are any of the following:

  • you should be able to read and write at the same time
  • you can cancel the conn.read() somehow
  • there's an async function conn.wait_until_disconnect() or something which resolves when the client disconnects

tqwewe avatar Mar 16 '21 08:03 tqwewe

Apparently this is because the library uses Unblock which does not support cancellation and only supports one read/write operation at a time.

tqwewe avatar Mar 16 '21 10:03 tqwewe

I've been planning to conditionally use platform-specific features of Tokio to implement poll-based asyncronous Ud-sockets and Windows named pipes leveraging the executor's event loop, but unlike mio, it doesn't have a Windows named pipe primitive to wrap (mio::windows::NamedPipe), so "proper" async support is blocked on that. My only option here is to contribute to Tokio myself (shouldn't be hard to wrap the NamedPipe from mio in Tokio), waiting until the named pipe support is released in a new version and wrapping its raw functionality here.

kotauskas avatar Mar 16 '21 15:03 kotauskas

Facing same issue, I'm trying to read and write at same time but both get stuck forever.

DjDeveloperr avatar May 28 '21 10:05 DjDeveloperr

Tokio 1.7 has been released with named pipes support. I guess it can be used now in the interprocess crate.

ancwrd1 avatar Jun 15 '21 20:06 ancwrd1

1.2.0 is finally out, complete with Tokio support for local sockets in the local_socket::tokio module. The nonblocking module is also deprecated and will be removed in 2.0.0. I'll close this one, but feel free to reopen if you encounter porting issues with the new API!

kotauskas avatar Nov 03 '22 14:11 kotauskas