yaque drops items?
Here's an example:
use std::path::Path;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
};
use yaque::{self, TryRecvError, recovery};
const CHAN_BUF_SIZE: usize = 32;
const BATCH_SIZE: usize = 50;
const QUEUE_PATH: &str = "state/queue";
struct WorkContext {
dispatch_sender: Sender<DispatchRequest>,
}
async fn work(ctx: WorkContext) {
loop {
let (send, mut recv) = channel(CHAN_BUF_SIZE);
println!("here!");
ctx.dispatch_sender
.send(DispatchRequest::GetChunk(send))
.await
.unwrap();
let ids = recv.recv().await.unwrap();
// do stuff w/ ids
}
}
#[derive(Debug)]
enum DispatchRequest {
GetChunk(Sender<Vec<String>>),
}
struct DispatchThread {
url_receiver: yaque::Receiver,
url_sender: yaque::Sender,
receiver: Receiver<DispatchRequest>,
sender: Sender<DispatchRequest>,
}
impl DispatchThread {
pub fn new(n_workers: usize) -> DispatchThread {
if Path::new(QUEUE_PATH).exists() {
println!("Recovering queue...");
recovery::recover_with_loss(QUEUE_PATH).unwrap();
}
let (url_send, url_recv) = yaque::channel(QUEUE_PATH).unwrap();
let (send, recv) = channel(CHAN_BUF_SIZE);
for _ in 0..n_workers {
let send = send.clone();
tokio::spawn(async move {
work(WorkContext {
dispatch_sender: send,
})
.await;
});
}
DispatchThread {
url_receiver: url_recv,
url_sender: url_send,
receiver: recv,
sender: send,
}
}
pub async fn add_vid_ids(&mut self, ids: Vec<String>) {
self.url_sender.send_batch(ids).await.unwrap();
}
pub async fn run(&mut self) {
loop {
match self.receiver.recv().await.unwrap() {
DispatchRequest::GetChunk(sender) => {
let batch = match self
.url_receiver
.try_recv_batch(BATCH_SIZE)
{
Ok(b) => Some(b),
Err(e) => match e {
TryRecvError::Io(e) => {
panic!("io error while receiving: {:?}", e);
}
TryRecvError::QeueuEmpty => None,
},
};
if let Some(batch) = batch {
assert!(batch.len() > 0);
let mut urls = vec![];
for url_bytes in batch.iter() {
urls.push(String::from(std::str::from_utf8(url_bytes).unwrap()));
}
batch.commit();
sender.send(urls).await.unwrap();
} else {
// recycle the request until we have more work
println!("Recycling request...");
self.sender
.send(DispatchRequest::GetChunk(sender))
.await
.unwrap();
}
}
}
}
}
}
#[tokio::main]
async fn main() {
let mut dispatch_thread = DispatchThread::new(1);
dispatch_thread
.add_vid_ids(vec![String::from("Wko7I9QcwUQ")])
.await;
tokio::spawn(async move {
dispatch_thread.run().await;
})
.await
.unwrap();
}
Cargo.toml:
[dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
yaque = { git = "https://github.com/tokahuke/yaque.git", branch = "try-recv" }
Most of the time, this outputs:
Recovering queue...
here!
Recycling request...
Recycling request...
Recycling request...
However, I would not expected this to be the case because I call add_vid_ids with a value, which should be added into the queue.
@vedantroy could you look in your Cargo.lock the exact commit that you are using? I have discovered a bug with batch requests while working on try-recv yesterday and have pushed a new commit addressing this issue (missing the tests proving that still).
@vedantroy have you had any further problem with this?
@vedantroy @tokahuke can this issue be closed or is it still present?
I think that we could close this because it seems quite old and nobody else has commented on it.
Let's reopen this later, if somebody else sees an issue.