lapin icon indicating copy to clipboard operation
lapin copied to clipboard

Missing messages during publishing

Open gudjonragnar opened this issue 2 years ago • 1 comments

I have a small script that is pushing messages to rabbit. It includes 2 processes: pulling data from a database and manipulating, and sending to rabbit. Internally this is done by spawning a tokio task that handles the rabbit part while the main task is doing the database work. To communicate between the two tasks I use an async channel. What I am seeing is that not all the messages are suceessfully delivered to rabbit. I checked and the publish action is being run for all messages but I suspect that the process finishes too early. The function that sends messages can be seen here (a bit simplified):

pub async fn send_to_rabbit(receiver: Receiver<Row>) -> Result<(), String> {
    let connection = rabbit_connection().await;
    let channel = connection
        .create_channel()
        .await
        .expect("Failed creating channel for rabbit connection");
    while let Ok(body) = receiver.recv().await {
        let value = serde_json::to_vec(&body)
            .map_err(|_e| format!("Failed to serialize message: {:?}", body))?;
        channel
            .basic_publish(
                "my-exchange",
                "entity",
                BasicPublishOptions::default(),
                &value,
                BasicProperties::default(),
            )
            .await
            .map_err(|_| "Error while sending to rabbit".to_string())?;
    }
    //  tokio::time::sleep(Duration::from_secs(5)).await;
    receiver.close();
    if let Err(e) = channel.wait_for_confirms().await {
        tracing::error!("Failure waiting for all confirms from RabbitMQ: {:?}", e)
    }
    Ok(())
}

When I add the sleep all messages are properly delivered. I initially thought wait_for_confirms would wait for all remaining confirms but that doesnt seem to be the case. I feel that simply adding sleep is not deterministic enough, there is no guarantee that it will finish in the allotted time.

Am I doing something wrong or misunderstanding something here?

gudjonragnar avatar Mar 30 '22 08:03 gudjonragnar

you need to call channel.confirm_select(ConfirmSelectOptions::default()).await to receive confirms

Roger avatar Jun 17 '22 09:06 Roger