rust-amqp
rust-amqp copied to clipboard
Consumer stops every <30 consumed messages
I'm running this code on cloudamqp with configuration: Cluster: cheerful-squirrel (change) RabbitMQ 3.5.7, Erlang 18.2
The program is able to consume up to 30 messages and I see that the queue disappears in the RabbitMQ dashboard, which probably means the connection is lost. After that the consumer hangs. When I set auto_delete
to false in queue_declare
I observe that Ready messages count grows while consumer hangs.
Here's the code, it mostly follows the provided example:
let mut session: Session = match Session::open_url(url) {
Ok(s) => s,
Err(e) => panic!("Session::open_url: {}", e),
};
let mut channel: Channel = match session.open_channel(1) {
Ok(s) => s,
Err(e) => panic!("Session.open_channel: {}", e),
};
channel.exchange_declare(exchange,
"fanout",
false, // passive
false, // durable
false, // auto_delete
false, // internal
false, // nowait - hangs when set to true
Table::new())
.err()
.map(|e| panic!("Channel.exchange_declare: {}", e));
channel.queue_declare(queue,
false, // passive
false, // durable
true, // exclusive
true, // auto_delete
false, // nowait - hangs when set to true
Table::new())
.err()
.map(|e| panic!("Channel.queue_declare: {}", e));
channel.queue_bind(queue,
exchange,
"",
false, // nowait - hangs when set to true
Table::new())
.err()
.map(|e| panic!("Channel.queue_bind: {}", e));
let consumer = Consumer { cnt: 0 };
let consumer_name = match channel.basic_consume(consumer,
queue,
"",
false, // no_local
false, // no_ack
true, // exlusive
false, // nowait - hangs when set to true
Table::new()) {
Ok(s) => s,
Err(e) => panic!("Channel.basic_consume: {}", e),
};
println!("{} Starting consumer: {}", date_str(), consumer_name);
channel.start_consuming();
channel.close(200, "Bye")
.err()
.map(|e| panic!("Channel.close: {}", e));
session.close(200, "Good Bye");
Consumer:
struct Consumer {
cnt: u64
}
impl amqp::Consumer for Consumer {
fn handle_delivery(&mut self,
channel: &mut Channel,
deliver: protocol::basic::Deliver,
_: protocol::basic::BasicProperties,
body: Vec<u8>) {
self.cnt += 1;
let s = str::from_utf8(&body).unwrap();
println!("{} Consumed #{}: {}", date_str(), self.cnt, s);
channel.basic_ack(deliver.delivery_tag, false)
.err()
.map(|e| panic!("Consumer.handle_delivery basic_ack: {}", e));
}
}
I should have mentioned this is HA cluster.
Can you run the consumer with the logger initialized: env_logger::init().unwrap();
and
RUST_LOG=debug
. This will give you some idea of what's going on.