romio icon indicating copy to clipboard operation
romio copied to clipboard

TcpStream's poll_close implementation doesn't close the connection

Open Nemo157 opened this issue 6 years ago • 3 comments

This is very similar to https://github.com/tokio-rs/tokio/issues/852.

Running the following test program hangs and doesn't notice that the client has closed the write side of the TcpStream:

#![feature(futures_api, async_await, await_macro)]

use std::{io, net::IpAddr};

use futures::{StreamExt, io::{AsyncReadExt, AsyncWriteExt}};
use romio::TcpListener;
use romio::TcpStream;

fn main() {
    futures::executor::block_on(async {
        let ip: IpAddr = "127.0.0.1".parse().unwrap();
        let mut listener = TcpListener::bind(&(ip, 0).into()).unwrap();

        let port = listener.local_addr().unwrap().port();

        let mut incoming = listener.incoming();
        let (_rx, mut tx) = await!(TcpStream::connect(&(ip, port).into())).unwrap().split();
        let (mut rx, _tx) = await!(incoming.next()).unwrap().unwrap().split();

        println!("Connection established");

        println!("Closing tx");
        await!(tx.close()).unwrap();

        println!("Wait for server to notice connection was closed...");
        let mut byte = [0];
        let res = await!(rx.read_exact(&mut byte));
        assert!(res.is_err());
        assert_eq!(res.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
    })
}

By adding in a wrapper around the clients TcpStream that implements poll_close to call shutdown(Shutdown::Write) the server notices that the client has closed the TCP connection

struct Fix(TcpStream);

impl AsyncRead for Fix {
    fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
        self.0.poll_read(waker, buf)
    }
}

impl AsyncWrite for Fix {
    fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
        self.0.poll_write(waker, buf)
    }

    fn poll_flush(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
        self.0.poll_flush(waker)
    }

    fn poll_close(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
        ready!(self.poll_flush(waker)?);
        Poll::Ready(self.0.shutdown(std::net::Shutdown::Write))
    }
}

For some reason in the linked Tokio issue they don't want to apply this change to TcpStream itself, I can't see any reason not to just change it though.

Nemo157 avatar Apr 04 '19 09:04 Nemo157

(One thing I'm not sure about with that wrapper is whether TcpStream::shutdown can return WouldBlock, if so then it would need to hook into the reactor somehow).

Nemo157 avatar Apr 04 '19 10:04 Nemo157

@Nemo157 I feel like I've run around into a version of this in Romio too (though not 100% sure it's the exact same thing). Iirc at the time it was cited that this was in line with std's TcpStream behavior.

Def agree having a way forward to prevent hanging would be excellent. I've heard of more people struggle with the reader APIs around Tcp.

yoshuawuyts avatar Apr 04 '19 13:04 yoshuawuyts

I'm running into this as well. AFAICT the behavior is not in line with the standard library. On the standard library TcpStream you can call shutdown which will allow the reading end to see the EOF. Dropping also works.

On the romio TcpStream only dropping works. Calling close will leave the reading side hanging until the sender is actually dropped. I think this is counter intuitive.I feel that on futures primitives calling close usually returns None on the reading end (channels, Stream/Sink, ...).

At the least, I think the behavior should be documented at TcpStream and at the close method.

Here is some dirty code to verify the stdlib behavior.

Is there a reason we don't call shutdown in PollEvented::close? It currently does nothing. I think we should call shutdown from mio::TcpStream here with Shutdown::Write.

I can file a PR if that sounds like the solution.

najamelan avatar Aug 11 '19 16:08 najamelan