lapin icon indicating copy to clipboard operation
lapin copied to clipboard

Possible Memory Leak

Open jsouto18 opened this issue 9 months ago • 2 comments

I've been using this library and recently I've noticed that when using a Delegate, memory seems to leak. I have the following Minimum Reproducible Example where if you run this with say 5MB messages on the queue you will notice that the memory of your process will never stop increasing. Am I doing something wrong?

MRE:

use lapin::{
    options::{BasicAckOptions, BasicConsumeOptions, BasicQosOptions},
    types::FieldTable,
    Connection, ConnectionProperties, ConsumerDelegate,
};

#[tokio::main]
async fn main() {
    let amqp_connection = Connection::connect(
        "amqp://127.0.0.1:5672/%2f",
        ConnectionProperties::default()
            .with_executor(tokio_executor_trait::Tokio::current())
            .with_reactor(tokio_reactor_trait::Tokio),
    )
    .await
    .unwrap();
    let consumer_channel = amqp_connection.create_channel().await.unwrap();

    consumer_channel
        .basic_qos(1, BasicQosOptions::default())
        .await
        .unwrap();

    consumer_channel
        .basic_consume(
            "entity_cache",
            "",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .unwrap()
        .set_delegate(Test {});

    std::future::pending::<()>().await;
}

struct Test;
impl ConsumerDelegate for Test {
    fn on_new_delivery(
        &self,
        delivery: lapin::message::DeliveryResult,
    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
        let delivery = delivery.expect("error in consumer").expect("???");

        Box::pin(async move {
            delivery.ack(BasicAckOptions::default()).await.expect("ack");
        })
    }
}

jsouto18 avatar Sep 20 '23 00:09 jsouto18