interprocess
interprocess copied to clipboard
Writing to a socket quickly on Windows causes a hang forever
Describe the bug cant write to a socket quickly on Windows this works completely fine on Unix but causes a block forever on Windows
To Reproduce Write to the socket Write again, very quickly
Expected behavior both writes succeed, and not stall forever
Reproducible example
[dependencies.interprocess]
version = "1.2.0"
features = ["tokio_support"]
[dependencies]
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.4.1", features = ["v4"] }
futures = "0.3.28"
tempdir = "0.3.7"
shared_lib:
use std::{env, thread};
use std::ffi::OsStr;
use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream};
use std::io::{Result, Write, Error as IoError, ErrorKind as IoErrorKind};
use std::time::Duration;
use futures::AsyncWriteExt;
use futures::AsyncReadExt;
use tokio::io::AsyncReadExt as _;
use tokio::process::Child;
#[repr(transparent)]
pub struct IpcStream {
stream: LocalSocketStream,
}
macro_rules! read_usize {
($stream: expr) => {{
let mut buff = [0; std::mem::size_of::<usize>()];
$stream.read_exact(&mut buff).await?;
usize::from_ne_bytes(buff)
}};
}
macro_rules! read_stream {
($stream: expr, $meth: ident, $r#type: ident) => {{
let stream = $stream;
let len = read_usize!(stream);
let data = {
let mut buf = $r#type::with_capacity(len);
let mut stream = stream.take(len as u64);
stream.$meth(&mut buf).await?;
if buf.len() != len {
return Err(IoError::new(
IoErrorKind::UnexpectedEof,
format!("expected {len} extra bytes found {} in stream", buf.len())
))
}
buf
};
Ok(data)
}};
}
impl IpcStream {
pub async fn connect_to_parent() -> Result<IpcStream> {
fn write_to_stdout_immediate(buf: &OsStr) -> Result<()> {
let buf = buf.to_string_lossy();
let buf = buf.as_bytes();
let mut stdout = std::io::stdout().lock();
stdout.write_all(&usize::to_ne_bytes(buf.len()))?;
stdout.write_all(buf)?;
stdout.flush()
}
let pathname = {
let mut temp_dir = env::temp_dir();
temp_dir.push(format!("{}.sock", uuid::Uuid::new_v4()));
temp_dir
};
write_to_stdout_immediate(pathname.as_os_str())?;
match LocalSocketListener::bind(pathname) {
Ok(listener) => match listener.accept().await {
Ok(sock) => Ok(IpcStream { stream: sock }),
Err(err) => Err(err)
},
Err(err) => Err(err)
}
}
pub async fn connect_to_child<'a>(child: &mut Child) -> Result<IpcStream> {
let name = {
let child_stdout = {
match child.stdout {
Some(ref mut out) => out,
None => return Err(IoError::new(
IoErrorKind::NotConnected, "No stdout")
),
}
};
let res: Result<String> = read_stream!(child_stdout, read_to_string, String);
res?
};
Ok(IpcStream {
stream: LocalSocketStream::connect(name).await?,
})
}
#[inline(always)]
pub async fn write_buf(&mut self, buf: &[u8]) -> Result<()> {
self.stream.write_all(&usize::to_ne_bytes(buf.len())).await?;
self.stream.write_all(buf).await
}
#[inline(always)]
pub async fn write_str(&mut self, buf: &str) -> Result<()> {
self.write_buf(buf.as_bytes()).await
}
pub async fn read_buff(&mut self) -> Result<Vec<u8>> {
read_stream!(&mut self.stream, read_to_end, Vec)
}
pub async fn read_string(&mut self) -> Result<String> {
read_stream!(&mut self.stream, read_to_string, String)
}
#[inline(always)]
pub async fn close(mut self) -> Result<()> {
self.stream.close().await
}
}
impl Drop for IpcStream {
fn drop(&mut self) {
let _ = futures::executor::block_on(self.stream.close());
}
}
Sender:
use shared_lib::IpcStream;
#[tokio::main]
async fn main() {
let mut stream = IpcStream::connect_to_parent().await.unwrap();
eprintln!("Connected to Parent");
stream.write_str("hello world").await.unwrap();
eprintln!("Wrote hello world to stream");
stream.close().await.unwrap();
eprintln!("Closed stream");
}
Receiver:
use std::process::Stdio;
use tokio::process::Command;
use shared_lib::IpcStream;
#[tokio::main]
async fn main() {
let mut child = Command::new("./test-ipc.exe")
.stdout(Stdio::piped())
.stdin(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
let mut stream = IpcStream::connect_to_child(&mut child).await.unwrap();
println!("{}", stream.read_string().await.unwrap());
}
I encountered a same problem.
@gaoqiangz @NightMare-Vortex Could you please create a simpler reproduction project? Ideally one that
- is just a .zip file or a GitHub repo
- doesn't depend on tokio
With the current one, narrowing down the error cause is a lot harder than it could be.
Update: I did try it without Tokio, and wasn't able to reproduce this ipc.zip
I'm fairly sure this bug is actually fixed on main
. Will look into whether this issue is still relevant or not after I release 2.0.0.
2.0.0 has been released. As a lot of Tokio-related bugs have been fixed since this issue was first reported, and because the test suite has been greatly expanded, I assume that this bug is no longer present.
Please reopen this issue and provide a minimal reproducible example if the bug persists.