Semantic violation that involves rendezvous channel and selector
fn main() {
let ch0 = flume::bounded(0);
let ch1 = flume::unbounded();
let _0 = ch0.0.clone();
let _1 = ch1.0.clone();
let t0 = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(async move {
ch0.0.send_async(0).await.unwrap();
println!("sent 0")
})
});
let t1 = std::thread::spawn(move || {
// std::thread::sleep(std::time::Duration::from_millis(1));
ch1.0.send(1).unwrap();
});
for _ in 0..2 {
let rx = flume::Selector::new()
.recv(&ch1.1, Result::unwrap)
.recv(&ch0.1, Result::unwrap)
.wait();
println!("received {rx}");
}
t0.join().unwrap();
t1.join().unwrap()
}
Repeat running and the following output may show
sent 0
received 1
received 0
Which is not allowed, because as long as sent 0 is printed, the rendezvous is done and the message should be passed to receiver side, and the selector must select it first. In the other direction, if the receiver 1 is printed first, the selector is selecting from the other channel first, so the rendezvous have not been done yet and sent 0 should not print.
The printlns happen after everything else, so there's no promise that their order actually corresponds to the order in which the rest of the code executes. It might be that taking the lock for stdout takes a while and so holds up a thread for a while, or even that the thread just gets preempted by the scheduler. Ordering promises only have meaning with respect to a single thread: there are no global ordering guarantees, and this is true of Rust in general when it comes to this sort of thing.
Thanks for pointing out the mistake. I admit that that was a mistake during minimize the reproducing code. Here is a slightly involved version:
fn main() {
let ch0 = flume::bounded(0);
let ch1 = flume::unbounded();
let _0 = ch0.0.clone();
let _1 = ch1.0.clone();
let send0 = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let t0 = std::thread::spawn({
let send0 = send0.clone();
move || {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(async move {
ch0.0.send_async(0).await.unwrap();
send0.store(true, std::sync::atomic::Ordering::SeqCst)
})
}
});
let t1 = std::thread::spawn(move || {
// std::thread::sleep(std::time::Duration::from_millis(1));
ch1.0.send(1).unwrap();
});
for i in 0..2 {
let rx = flume::Selector::new()
.recv(&ch1.1, Result::unwrap)
.recv(&ch0.1, Result::unwrap)
.wait();
if i == 0 && rx == 1 {
assert!(!send0.load(std::sync::atomic::Ordering::SeqCst));
}
}
t0.join().unwrap();
t1.join().unwrap()
}
The condition and the assertion form what I expected from semantic: if rendezvous value not passed yet, the rendezvous sender should stuck at sending point and not progress. Loop the program for 100 times and it's easy to reproduce assertion failure.
Is it possible that this occurs:
- 0 is sent into rendezvous channel
- selector wakes up because of this
- the rendezvous sending thread is allowed to continue and sets send0 to true
- 1 is sent into ch1 (unbounded channel)
- selector sees that both are ready, but prioritises ch1 and returns 1
- assertion fails
If this is the case, we'd expect that ch0 would also resolve to 0 immediately. Not sure that this can be tested though
That is also my guess, but as far as I know should not be allowed.
The problem is the third step, "sending thread is allowed to continue". If it is a single receiver on the receiving side instead of a selector, the returning of recv should strictly happen before the returning of send, hence the rendezvous.
I would expect a similar semantic when selector is involved: the returning of selector's wait with the result from rendezvous sender should happen before the returning of send. Let me know if I should not expect this.