coio-rs icon indicating copy to clipboard operation
coio-rs copied to clipboard

Bug: Sync channel returns RecvError

Open zonyitoo opened this issue 8 years ago • 5 comments

Minimal test case:

extern crate coio;

use coio::Scheduler;
use coio::sync::mpsc;

fn main() {
    Scheduler::new().run(|| {
        let (tx, rx) = mpsc::sync_channel(1);
        let h = Scheduler::spawn(move|| {
            tx.send(1).unwrap();
        });

        assert_eq!(rx.recv().unwrap(), 1);

        h.join().unwrap();
    }).unwrap();
}

The program would panic with message:

thread 'Processor #0' panicked at 'called `Result::unwrap()` on an `Err` value: RecvError', ../src/libcore/result.rs:746
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread '<main>' panicked at 'called `Result::unwrap()` on an `Err` value: Any', ../src/libcore/result.rs:746

zonyitoo avatar Jan 31 '16 12:01 zonyitoo

This could be fixed by

extern crate coio;
extern crate env_logger;

use coio::Scheduler;
use coio::sync::mpsc;

fn main() {
    env_logger::init().unwrap();

    Scheduler::new().run(|| {
        let (tx, rx) = mpsc::sync_channel(1);
        let h = Scheduler::spawn(move|| {
            // 2. Push 1 into the queue and push <main> into the work queue
            tx.send(1).unwrap();

            // 3. Force yield the current coroutine,
            //    which will let the receiver have a chance to be waken up and read data
            Scheduler::sched();
        });

        // 1. The <main> coroutine block itself into the wait list
        assert_eq!(rx.recv().unwrap(), 1);

        h.join().unwrap();
    }).unwrap();
}

The official implementation of sync channel, the try_recv method will return Disconnected if it is the last one.

So when the last SyncSender is dropped, the receiver will always return Disconnected even if there are items remain in the queue.

zonyitoo avatar Jan 31 '16 14:01 zonyitoo

Ah good idea! I didn't think of this... Maybe we should add a method to the Scheduler that works just like Scheduler::ready(coro) but yields to the coro if it's on the same Processor? Or would it still break if they run on 2 different Processors?

I mean: I think it should still break if 2 different Processors are used. So... Should we really adopt the standard Rust channel semantics? Why are they even using this weird Disconnected state? Shouldn't a channel be empty before a Disconnected is returned? IMHO that would make a lot more sense...

lhecker avatar Jan 31 '16 14:01 lhecker

I spent some time on reading the source code of the official implementation of mpsc channel, and I think it too complicated to reinvent this wheel. The behavior should have been discussed in RFC, so I think it is Ok to use it. Also, you cannot reproduce it with threads.

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::sync_channel(1);
    let t = thread::spawn(move|| {
        tx.send(1).unwrap();
    });

    t.join().unwrap();

    rx.recv().unwrap();
}

It always works because the official implementation will actually park the thread.

zonyitoo avatar Jan 31 '16 14:01 zonyitoo

I've read it too now... So we either have to use the same thing as std (ACKs), or we have to wait until the queue is empty before we trigger a Disconnected result. The former case will be a lot slower (btw: the std implementation uses Mutex?! Wow. :disappointed: ) and the latter case is dangerous because we don't want to wait until the overwhelmed receiver is finished even though all senders are already gone. I think we should invent something ouselves here, because it can't be that complicated if even the std. implementation is using a Mutex for this - we can't really make it any worse right? :neutral_face:

lhecker avatar Jan 31 '16 14:01 lhecker

Yep. I agree.

zonyitoo avatar Jan 31 '16 14:01 zonyitoo