interprocess icon indicating copy to clipboard operation
interprocess copied to clipboard

Writing to a socket quickly on Windows causes a hang forever

Open Vrtgs opened this issue 1 year ago • 3 comments

Describe the bug cant write to a socket quickly on Windows this works completely fine on Unix but causes a block forever on Windows

To Reproduce Write to the socket Write again, very quickly

Expected behavior both writes succeed, and not stall forever

Reproducible example

[dependencies.interprocess]
version = "1.2.0"
features = ["tokio_support"]

[dependencies]
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.4.1", features = ["v4"] }
futures = "0.3.28"
tempdir = "0.3.7"

shared_lib:

use std::{env, thread};
use std::ffi::OsStr;
use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream};
use std::io::{Result, Write, Error as IoError, ErrorKind as IoErrorKind};
use std::time::Duration;
use futures::AsyncWriteExt;
use futures::AsyncReadExt;
use tokio::io::AsyncReadExt as _;
use tokio::process::Child;


#[repr(transparent)]
pub struct IpcStream {
    stream: LocalSocketStream,
}

macro_rules! read_usize {
    ($stream: expr) => {{
        let mut buff = [0; std::mem::size_of::<usize>()];
        $stream.read_exact(&mut buff).await?;
        usize::from_ne_bytes(buff)
    }};
}
macro_rules! read_stream {
    ($stream: expr, $meth: ident, $r#type: ident) => {{
        let stream = $stream;
        let len = read_usize!(stream);
        let data = {
            let mut buf = $r#type::with_capacity(len);
            let mut stream = stream.take(len as u64);
            stream.$meth(&mut buf).await?;

            if buf.len() != len {
                return Err(IoError::new(
                    IoErrorKind::UnexpectedEof,
                    format!("expected {len} extra bytes found {} in stream", buf.len())
                ))
            }
            buf
        };

        Ok(data)
    }};
}

impl IpcStream {
    pub async fn connect_to_parent() -> Result<IpcStream> {
        fn write_to_stdout_immediate(buf: &OsStr) -> Result<()> {
            let buf = buf.to_string_lossy();
            let buf = buf.as_bytes();
            let mut stdout = std::io::stdout().lock();
            stdout.write_all(&usize::to_ne_bytes(buf.len()))?;
            stdout.write_all(buf)?;
            stdout.flush()
        }

        let pathname = {
            let mut temp_dir = env::temp_dir();
            temp_dir.push(format!("{}.sock", uuid::Uuid::new_v4()));
            temp_dir
        };
        write_to_stdout_immediate(pathname.as_os_str())?;

        match LocalSocketListener::bind(pathname) {
            Ok(listener) => match listener.accept().await {
                Ok(sock) => Ok(IpcStream { stream: sock }),
                Err(err) => Err(err)
            },
            Err(err) => Err(err)
        }
    }
    pub async fn connect_to_child<'a>(child: &mut Child) -> Result<IpcStream> {
        let name = {
            let child_stdout = {
                match child.stdout {
                    Some(ref mut out) => out,
                    None => return Err(IoError::new(
                        IoErrorKind::NotConnected, "No stdout")
                    ),
                }
            };
            let res: Result<String> = read_stream!(child_stdout, read_to_string, String);

            res?
        };

        Ok(IpcStream {
            stream: LocalSocketStream::connect(name).await?,
        })
    }

    #[inline(always)]
    pub async fn write_buf(&mut self, buf: &[u8]) -> Result<()> {
        self.stream.write_all(&usize::to_ne_bytes(buf.len())).await?;
        self.stream.write_all(buf).await
    }
    #[inline(always)]
    pub async fn write_str(&mut self, buf: &str) -> Result<()> {
        self.write_buf(buf.as_bytes()).await
    }

    pub async fn read_buff(&mut self) -> Result<Vec<u8>> {
        read_stream!(&mut self.stream, read_to_end, Vec)
    }
    pub async fn read_string(&mut self) -> Result<String> {
        read_stream!(&mut self.stream, read_to_string, String)
    }

    #[inline(always)]
    pub async fn close(mut self) -> Result<()> {
        self.stream.close().await
    }
}

impl Drop for IpcStream {
    fn drop(&mut self) {
        let _ = futures::executor::block_on(self.stream.close());
    }
}

Sender:

use shared_lib::IpcStream;

#[tokio::main]
async fn main() {
    let mut stream = IpcStream::connect_to_parent().await.unwrap();
    eprintln!("Connected to Parent");
    stream.write_str("hello world").await.unwrap();
    eprintln!("Wrote hello world to stream");
    stream.close().await.unwrap();
    eprintln!("Closed stream");
}

Receiver:

use std::process::Stdio;
use tokio::process::Command;
use shared_lib::IpcStream;

#[tokio::main]
async fn main() {
    let mut child = Command::new("./test-ipc.exe")
        .stdout(Stdio::piped())
        .stdin(Stdio::null())
        .stderr(Stdio::inherit())
        .spawn()
        .unwrap();

    let mut stream = IpcStream::connect_to_child(&mut child).await.unwrap();

    println!("{}", stream.read_string().await.unwrap());
}

Vrtgs avatar Sep 08 '23 12:09 Vrtgs

I encountered a same problem.

gaoqiangz avatar Nov 10 '23 06:11 gaoqiangz

@gaoqiangz @NightMare-Vortex Could you please create a simpler reproduction project? Ideally one that

  • is just a .zip file or a GitHub repo
  • doesn't depend on tokio

With the current one, narrowing down the error cause is a lot harder than it could be.

Update: I did try it without Tokio, and wasn't able to reproduce this ipc.zip

stefnotch avatar Nov 19 '23 19:11 stefnotch

I'm fairly sure this bug is actually fixed on main. Will look into whether this issue is still relevant or not after I release 2.0.0.

kotauskas avatar Nov 19 '23 19:11 kotauskas

2.0.0 has been released. As a lot of Tokio-related bugs have been fixed since this issue was first reported, and because the test suite has been greatly expanded, I assume that this bug is no longer present.

Please reopen this issue and provide a minimal reproducible example if the bug persists.

kotauskas avatar Apr 26 '24 18:04 kotauskas