yaque icon indicating copy to clipboard operation
yaque copied to clipboard

yaque drops items?

Open vedantroy opened this issue 4 years ago • 2 comments

Here's an example:

use std::path::Path;
use tokio::{
    sync::mpsc::{channel, Receiver, Sender},
};
use yaque::{self, TryRecvError, recovery};

const CHAN_BUF_SIZE: usize = 32;
const BATCH_SIZE: usize = 50;
const QUEUE_PATH: &str = "state/queue";

struct WorkContext {
    dispatch_sender: Sender<DispatchRequest>,
}

async fn work(ctx: WorkContext) {
    loop {
        let (send, mut recv) = channel(CHAN_BUF_SIZE);
        println!("here!");
        ctx.dispatch_sender
            .send(DispatchRequest::GetChunk(send))
            .await
            .unwrap();
        let ids = recv.recv().await.unwrap();
        // do stuff w/ ids
    }
}

#[derive(Debug)]
enum DispatchRequest {
    GetChunk(Sender<Vec<String>>),
}

struct DispatchThread {
    url_receiver: yaque::Receiver,
    url_sender: yaque::Sender,
    receiver: Receiver<DispatchRequest>,
    sender: Sender<DispatchRequest>,
}

impl DispatchThread {
    pub fn new(n_workers: usize) -> DispatchThread {
        if Path::new(QUEUE_PATH).exists() {
            println!("Recovering queue...");
            recovery::recover_with_loss(QUEUE_PATH).unwrap();
        }
        let (url_send, url_recv) = yaque::channel(QUEUE_PATH).unwrap();
        let (send, recv) = channel(CHAN_BUF_SIZE);

        for _ in 0..n_workers {
            let send = send.clone();
            tokio::spawn(async move {
                work(WorkContext {
                    dispatch_sender: send,
                })
                .await;
            });
        }

        DispatchThread {
            url_receiver: url_recv,
            url_sender: url_send,
            receiver: recv,
            sender: send,
        }
    }

    pub async fn add_vid_ids(&mut self, ids: Vec<String>) {
        self.url_sender.send_batch(ids).await.unwrap();
    }

    pub async fn run(&mut self) {
        loop {
            match self.receiver.recv().await.unwrap() {
                DispatchRequest::GetChunk(sender) => {
                    let batch = match self
                        .url_receiver
                        .try_recv_batch(BATCH_SIZE)
                    {
                        Ok(b) => Some(b),
                        Err(e) => match e {
                            TryRecvError::Io(e) => {
                                panic!("io error while receiving: {:?}", e);
                            }
                            TryRecvError::QeueuEmpty => None,
                        },
                    };
                    if let Some(batch) = batch {
                        assert!(batch.len() > 0);
                        let mut urls = vec![];
                        for url_bytes in batch.iter() {
                            urls.push(String::from(std::str::from_utf8(url_bytes).unwrap()));
                        }
                        batch.commit();
                        sender.send(urls).await.unwrap();
                    } else {
                        // recycle the request until we have more work
                        println!("Recycling request...");
                        self.sender
                            .send(DispatchRequest::GetChunk(sender))
                            .await
                            .unwrap();
                    }
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let mut dispatch_thread = DispatchThread::new(1);
    dispatch_thread
        .add_vid_ids(vec![String::from("Wko7I9QcwUQ")])
        .await;
    tokio::spawn(async move {
        dispatch_thread.run().await;
    })
    .await
    .unwrap();
}

Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
yaque = { git = "https://github.com/tokahuke/yaque.git", branch = "try-recv" }

Most of the time, this outputs:

Recovering queue...
here!
Recycling request...
Recycling request...
Recycling request...

However, I would not expected this to be the case because I call add_vid_ids with a value, which should be added into the queue.

vedantroy avatar Mar 10 '21 17:03 vedantroy

@vedantroy could you look in your Cargo.lock the exact commit that you are using? I have discovered a bug with batch requests while working on try-recv yesterday and have pushed a new commit addressing this issue (missing the tests proving that still).

tokahuke avatar Mar 10 '21 19:03 tokahuke

@vedantroy have you had any further problem with this?

tokahuke avatar May 15 '21 16:05 tokahuke

@vedantroy @tokahuke can this issue be closed or is it still present?

barafael avatar Apr 05 '24 13:04 barafael

I think that we could close this because it seems quite old and nobody else has commented on it.

tokahuke avatar Apr 08 '24 14:04 tokahuke

Let's reopen this later, if somebody else sees an issue.

tokahuke avatar Apr 08 '24 14:04 tokahuke