coio-rs
coio-rs copied to clipboard
Bug: Sync channel returns RecvError
Minimal test case:
extern crate coio;
use coio::Scheduler;
use coio::sync::mpsc;
fn main() {
Scheduler::new().run(|| {
let (tx, rx) = mpsc::sync_channel(1);
let h = Scheduler::spawn(move|| {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
h.join().unwrap();
}).unwrap();
}
The program would panic with message:
thread 'Processor #0' panicked at 'called `Result::unwrap()` on an `Err` value: RecvError', ../src/libcore/result.rs:746
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread '<main>' panicked at 'called `Result::unwrap()` on an `Err` value: Any', ../src/libcore/result.rs:746
This could be fixed by
extern crate coio;
extern crate env_logger;
use coio::Scheduler;
use coio::sync::mpsc;
fn main() {
env_logger::init().unwrap();
Scheduler::new().run(|| {
let (tx, rx) = mpsc::sync_channel(1);
let h = Scheduler::spawn(move|| {
// 2. Push 1 into the queue and push <main> into the work queue
tx.send(1).unwrap();
// 3. Force yield the current coroutine,
// which will let the receiver have a chance to be waken up and read data
Scheduler::sched();
});
// 1. The <main> coroutine block itself into the wait list
assert_eq!(rx.recv().unwrap(), 1);
h.join().unwrap();
}).unwrap();
}
The official implementation of sync channel, the try_recv
method will return Disconnected
if it is the last one.
So when the last SyncSender
is dropped, the receiver will always return Disconnected
even if there are items remain in the queue.
Ah good idea! I didn't think of this... Maybe we should add a method to the Scheduler that works just like Scheduler::ready(coro) but yields to the coro if it's on the same Processor? Or would it still break if they run on 2 different Processors?
I mean: I think it should still break if 2 different Processors are used. So... Should we really adopt the standard Rust channel semantics? Why are they even using this weird Disconnected state? Shouldn't a channel be empty before a Disconnected is returned? IMHO that would make a lot more sense...
I spent some time on reading the source code of the official implementation of mpsc channel, and I think it too complicated to reinvent this wheel. The behavior should have been discussed in RFC, so I think it is Ok to use it. Also, you cannot reproduce it with threads.
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::sync_channel(1);
let t = thread::spawn(move|| {
tx.send(1).unwrap();
});
t.join().unwrap();
rx.recv().unwrap();
}
It always works because the official implementation will actually park the thread.
I've read it too now... So we either have to use the same thing as std (ACKs), or we have to wait until the queue is empty before we trigger a Disconnected result. The former case will be a lot slower (btw: the std implementation uses Mutex?! Wow. :disappointed: ) and the latter case is dangerous because we don't want to wait until the overwhelmed receiver is finished even though all senders are already gone. I think we should invent something ouselves here, because it can't be that complicated if even the std. implementation is using a Mutex for this - we can't really make it any worse right? :neutral_face:
Yep. I agree.