lapin
lapin copied to clipboard
Possible Memory Leak
I've been using this library and recently I've noticed that when using a Delegate, memory seems to leak. I have the following Minimum Reproducible Example where if you run this with say 5MB messages on the queue you will notice that the memory of your process will never stop increasing. Am I doing something wrong?
MRE:
use lapin::{
options::{BasicAckOptions, BasicConsumeOptions, BasicQosOptions},
types::FieldTable,
Connection, ConnectionProperties, ConsumerDelegate,
};
#[tokio::main]
async fn main() {
let amqp_connection = Connection::connect(
"amqp://127.0.0.1:5672/%2f",
ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
)
.await
.unwrap();
let consumer_channel = amqp_connection.create_channel().await.unwrap();
consumer_channel
.basic_qos(1, BasicQosOptions::default())
.await
.unwrap();
consumer_channel
.basic_consume(
"entity_cache",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap()
.set_delegate(Test {});
std::future::pending::<()>().await;
}
struct Test;
impl ConsumerDelegate for Test {
fn on_new_delivery(
&self,
delivery: lapin::message::DeliveryResult,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let delivery = delivery.expect("error in consumer").expect("???");
Box::pin(async move {
delivery.ack(BasicAckOptions::default()).await.expect("ack");
})
}
}