Lost message when selecing on MPMC in loop from multiple threads
I have tried to make this example as minimal as I possibly could, but basically this program will hang after a short time on my computer (intel i7 1165g7 + linux 6.6.2 + glibc)
The basic premise is this:
- a single thread loops, sending messages into a channel
- sometimes it spawns background threads, which select on that channel and a "signal" channel
- sometimes it sends a message to the signal channel, which causes one background thread to exit
- it cycles between spawning up to 3 threads and reaping down to 1 thread
- the message sent to MPMC is itself a channel that sends on drop
- the main thread waits for one of the background threads to drop each message before sending another
I find that very often, during the first thread reaping cycle, a message that is sent is never dropped, causing the main thread to never progress, even though there still exist threads that are selecting on the channel. I believe this is due to a race in the Selector implementation
without further ado, here's the code:
fn main() {
let (sender, receiver) = flume::bounded(8);
let mut signals: Vec<flume::Sender<()>> = Vec::new();
let mut launch = true;
for i in 0u64.. {
if i % 100000 == 0 {
println!("looping");
}
if i % 10000 == 0 {
if signals.len() >= 3 || !launch {
launch = false;
if let Some(signal_tx) = signals.pop() {
signal_tx.send(()).expect("Sent");
drop(signal_tx);
}
}
if signals.len() <= 1 || launch {
launch = true;
let (signal_tx, signal) = flume::bounded(1);
signals.push(signal_tx);
let rx2 = receiver.clone();
std::thread::spawn(move || {
println!("Launching thread");
while !race(&rx2, &signal) {
// spin
}
println!("Closing thread");
});
}
}
let (dropper_tx, dropper) = flume::bounded(1);
sender.send(Dropper { sender: dropper_tx }).expect("sent");
dropper.recv().expect("received");
}
}
struct Dropper {
sender: flume::Sender<()>,
}
impl Drop for Dropper {
fn drop(&mut self) {
self.sender.send(()).expect("sent");
}
}
fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
flume::Selector::new()
.recv(receiver, |res| {
let out = res.is_err();
drop(res);
out
})
.recv(signal, |_res| true)
.wait()
}
edit: i'm very sorry the formatting of folded code is the way it is
I'll add that the program does not exhibit this behavior when flume::Selector is replaced by a simple async block_on + select + recv_async
I'll provide a simple working example here. This is what a working race looks like:
fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
match selector::blocking_select(receiver.recv_async(), signal.recv_async()) {
selector::Either::Left(res) => {
let out = res.is_err();
drop(res);
out
}
selector::Either::Right(_res) => true,
}
}
implementation of simple async executor behind fold
```rust use std::{ future::Future, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, task::{Context, Poll, Wake, Waker}, };struct ThreadWaker { thread: std::thread::Thread, }
impl Wake for ThreadWaker { fn wake(self: Arc<Self>) { self.thread.unpark(); }
fn wake_by_ref(self: &Arc<Self>) {
self.thread.unpark();
}
}
pub(super) enum Either<L, R> { Left(L), Right(R), }
struct Select<F1, F2> { left: F1, left_woken: Arc<AtomicBool>,
right: F2,
right_woken: Arc<AtomicBool>,
}
struct SelectWaker { inner: Waker, flag: Arc<AtomicBool>, }
impl Wake for SelectWaker { fn wake_by_ref(self: &Arc<Self>) { self.flag.store(true, Ordering::Release);
self.inner.wake_by_ref();
}
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::Release);
match Arc::try_unwrap(self) {
Ok(this) => this.inner.wake(),
Err(this) => this.inner.wake_by_ref(),
}
}
}
impl<F1, F2> Future for Select<F1, F2> where F1: Future + Unpin, F2: Future + Unpin, { type Output = Either<F1::Output, F2::Output>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let left_waker = Arc::new(SelectWaker {
inner: cx.waker().clone(),
flag: self.left_woken.clone(),
})
.into();
let mut left_ctx = Context::from_waker(&left_waker);
if let Poll::Ready(left_out) = Pin::new(&mut self.left).poll(&mut left_ctx) {
return Poll::Ready(Either::Left(left_out));
}
let right_waker = Arc::new(SelectWaker {
inner: cx.waker().clone(),
flag: self.right_woken.clone(),
})
.into();
let mut right_ctx = Context::from_waker(&right_waker);
if let Poll::Ready(right_out) = Pin::new(&mut self.right).poll(&mut right_ctx) {
return Poll::Ready(Either::Right(right_out));
}
Poll::Pending
}
}
pub(super) fn blocking_select<Left, Right>( left: Left, right: Right, ) -> Either<Left::Output, Right::Output> where Left: Future, Right: Future, { block_on(select(left, right)) }
fn block_on<F>(fut: F) -> F::Output where F: Future, { let thread_waker = Arc::new(ThreadWaker { thread: std::thread::current(), }) .into();
let mut ctx = Context::from_waker(&thread_waker);
let mut fut = std::pin::pin!(fut);
loop {
if let Poll::Ready(out) = fut.as_mut().poll(&mut ctx) {
return out;
}
// doesn't race - unpark followed by park will result in park returning immediately
std::thread::park();
}
}
async fn select<Left, Right>(left: Left, right: Right) -> Either<Left::Output, Right::Output> where Left: Future, Right: Future, { let left = std::pin::pin!(left); let right = std::pin::pin!(right);
Select {
left,
left_woken: Arc::new(AtomicBool::new(true)),
right,
right_woken: Arc::new(AtomicBool::new(true)),
}
.await
}
</summary>
I've put all the demo code in a repo on my forgejo: https://git.asonix.dog/asonix/flume-deadlock