flume icon indicating copy to clipboard operation
flume copied to clipboard

Lost message when selecing on MPMC in loop from multiple threads

Open asonix opened this issue 2 years ago • 2 comments

I have tried to make this example as minimal as I possibly could, but basically this program will hang after a short time on my computer (intel i7 1165g7 + linux 6.6.2 + glibc)

The basic premise is this:

  • a single thread loops, sending messages into a channel
  • sometimes it spawns background threads, which select on that channel and a "signal" channel
  • sometimes it sends a message to the signal channel, which causes one background thread to exit
  • it cycles between spawning up to 3 threads and reaping down to 1 thread
  • the message sent to MPMC is itself a channel that sends on drop
  • the main thread waits for one of the background threads to drop each message before sending another

I find that very often, during the first thread reaping cycle, a message that is sent is never dropped, causing the main thread to never progress, even though there still exist threads that are selecting on the channel. I believe this is due to a race in the Selector implementation

without further ado, here's the code:

fn main() {
    let (sender, receiver) = flume::bounded(8);

    let mut signals: Vec<flume::Sender<()>> = Vec::new();

    let mut launch = true;

    for i in 0u64.. {
        if i % 100000 == 0 {
            println!("looping");
        }

        if i % 10000 == 0 {
            if signals.len() >= 3 || !launch {
                launch = false;

                if let Some(signal_tx) = signals.pop() {
                    signal_tx.send(()).expect("Sent");
                    drop(signal_tx);
                }
            }

            if signals.len() <= 1 || launch {
                launch = true;

                let (signal_tx, signal) = flume::bounded(1);

                signals.push(signal_tx);

                let rx2 = receiver.clone();
                std::thread::spawn(move || {
                    println!("Launching thread");
                    while !race(&rx2, &signal) {
                        // spin
                    }
                    println!("Closing thread");
                });
            }
        }

        let (dropper_tx, dropper) = flume::bounded(1);

        sender.send(Dropper { sender: dropper_tx }).expect("sent");

        dropper.recv().expect("received");
    }
}

struct Dropper {
    sender: flume::Sender<()>,
}

impl Drop for Dropper {
    fn drop(&mut self) {
        self.sender.send(()).expect("sent");
    }
}

fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
    flume::Selector::new()
        .recv(receiver, |res| {
            let out = res.is_err();
            drop(res);
            out
        })
        .recv(signal, |_res| true)
        .wait()
}

asonix avatar Nov 25 '23 20:11 asonix

edit: i'm very sorry the formatting of folded code is the way it is

I'll add that the program does not exhibit this behavior when flume::Selector is replaced by a simple async block_on + select + recv_async

I'll provide a simple working example here. This is what a working race looks like:

fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
    match selector::blocking_select(receiver.recv_async(), signal.recv_async()) {
        selector::Either::Left(res) => {
            let out = res.is_err();
            drop(res);
            out
        }
        selector::Either::Right(_res) => true,
    }
}
implementation of simple async executor behind fold ```rust use std::{ future::Future, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, task::{Context, Poll, Wake, Waker}, };

struct ThreadWaker { thread: std::thread::Thread, }

impl Wake for ThreadWaker { fn wake(self: Arc<Self>) { self.thread.unpark(); }

fn wake_by_ref(self: &Arc<Self>) {
    self.thread.unpark();
}

}

pub(super) enum Either<L, R> { Left(L), Right(R), }

struct Select<F1, F2> { left: F1, left_woken: Arc<AtomicBool>,

right: F2,
right_woken: Arc<AtomicBool>,

}

struct SelectWaker { inner: Waker, flag: Arc<AtomicBool>, }

impl Wake for SelectWaker { fn wake_by_ref(self: &Arc<Self>) { self.flag.store(true, Ordering::Release);

    self.inner.wake_by_ref();
}

fn wake(self: Arc<Self>) {
    self.flag.store(true, Ordering::Release);

    match Arc::try_unwrap(self) {
        Ok(this) => this.inner.wake(),
        Err(this) => this.inner.wake_by_ref(),
    }
}

}

impl<F1, F2> Future for Select<F1, F2> where F1: Future + Unpin, F2: Future + Unpin, { type Output = Either<F1::Output, F2::Output>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let left_waker = Arc::new(SelectWaker {
        inner: cx.waker().clone(),
        flag: self.left_woken.clone(),
    })
    .into();

    let mut left_ctx = Context::from_waker(&left_waker);

    if let Poll::Ready(left_out) = Pin::new(&mut self.left).poll(&mut left_ctx) {
        return Poll::Ready(Either::Left(left_out));
    }

    let right_waker = Arc::new(SelectWaker {
        inner: cx.waker().clone(),
        flag: self.right_woken.clone(),
    })
    .into();

    let mut right_ctx = Context::from_waker(&right_waker);

    if let Poll::Ready(right_out) = Pin::new(&mut self.right).poll(&mut right_ctx) {
        return Poll::Ready(Either::Right(right_out));
    }

    Poll::Pending
}

}

pub(super) fn blocking_select<Left, Right>( left: Left, right: Right, ) -> Either<Left::Output, Right::Output> where Left: Future, Right: Future, { block_on(select(left, right)) }

fn block_on<F>(fut: F) -> F::Output where F: Future, { let thread_waker = Arc::new(ThreadWaker { thread: std::thread::current(), }) .into();

let mut ctx = Context::from_waker(&thread_waker);

let mut fut = std::pin::pin!(fut);

loop {
    if let Poll::Ready(out) = fut.as_mut().poll(&mut ctx) {
        return out;
    }

    // doesn't race - unpark followed by park will result in park returning immediately
    std::thread::park();
}

}

async fn select<Left, Right>(left: Left, right: Right) -> Either<Left::Output, Right::Output> where Left: Future, Right: Future, { let left = std::pin::pin!(left); let right = std::pin::pin!(right);

Select {
    left,
    left_woken: Arc::new(AtomicBool::new(true)),

    right,
    right_woken: Arc::new(AtomicBool::new(true)),
}
.await

}

</summary>

asonix avatar Nov 25 '23 20:11 asonix

I've put all the demo code in a repo on my forgejo: https://git.asonix.dog/asonix/flume-deadlock

asonix avatar Nov 25 '23 20:11 asonix