lapin
lapin copied to clipboard
Missing messages during publishing
I have a small script that is pushing messages to rabbit. It includes 2 processes: pulling data from a database and manipulating, and sending to rabbit. Internally this is done by spawning a tokio task that handles the rabbit part while the main task is doing the database work. To communicate between the two tasks I use an async channel. What I am seeing is that not all the messages are suceessfully delivered to rabbit. I checked and the publish action is being run for all messages but I suspect that the process finishes too early. The function that sends messages can be seen here (a bit simplified):
pub async fn send_to_rabbit(receiver: Receiver<Row>) -> Result<(), String> {
let connection = rabbit_connection().await;
let channel = connection
.create_channel()
.await
.expect("Failed creating channel for rabbit connection");
while let Ok(body) = receiver.recv().await {
let value = serde_json::to_vec(&body)
.map_err(|_e| format!("Failed to serialize message: {:?}", body))?;
channel
.basic_publish(
"my-exchange",
"entity",
BasicPublishOptions::default(),
&value,
BasicProperties::default(),
)
.await
.map_err(|_| "Error while sending to rabbit".to_string())?;
}
// tokio::time::sleep(Duration::from_secs(5)).await;
receiver.close();
if let Err(e) = channel.wait_for_confirms().await {
tracing::error!("Failure waiting for all confirms from RabbitMQ: {:?}", e)
}
Ok(())
}
When I add the sleep all messages are properly delivered. I initially thought wait_for_confirms
would wait for all remaining confirms but that doesnt seem to be the case. I feel that simply adding sleep is not deterministic enough, there is no guarantee that it will finish in the allotted time.
Am I doing something wrong or misunderstanding something here?
you need to call channel.confirm_select(ConfirmSelectOptions::default()).await
to receive confirms