dart_amqp
dart_amqp copied to clipboard
amqp connection lost, and recreate multiples consumers in same queue
I have been used this lib, but after a while the connection auto close and when start it again it connect with another consumer in queue,
I can publish in another queues but it stop consuming. in order to finger out what is happen the solution is complete delete the queue and restart it again. I read in pub.dev that
the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.
is that what is happening ? how can I try implement this an check is there is an stable connection or if the queue has consumers? how can I persist the consumer to avoid this? thanks
class Amqp {
late bool isRunning;
late Client client;
late SoundsService soundsService;
Amqp() {
isRunning = false;
ConnectionSettings settings = ConnectionSettings(
virtualHost:"/",
host: "rabbitmqhost",
port: rabbitmqport,
authProvider: const PlainAuthenticator(
"rabbitmqusers",
"rabbitmqpass"
),
);
client = Client(settings: settings);
soundsService = SoundsService();
}
getClient() {
return client;
}
start() async {
print("AMQP::starting");
//consume();
client.connect().then( (defaultChannel) => {
consume()
}).catchError((onError) => {
print("ERROR: $onError"),
});
return true;
}
close() async {
isRunning = false;
print("isRunning: $isRunning");
client.channel().then((channel) => {
print("amqp::channel::closed"),
channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
queue.delete().then((_) => {
print("amqp::queue::closed"),
client.close().then((_) => {
print("amqp::client::closed"),
})
})
}),
channel.close()
});
client.close();
return true;
}
void deleteQueue() {
client.channel().then((channel) => {
channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
queue.delete().then((_) => {
print("FORCE::amqp::queue::deleted"),
})
}),
});
}
/// *************************************
/// consumer
consume() async {
isRunning = true;
print("AMQP::connected::starting-consume");
print("isRunning: $isRunning");
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
Queue queue = await channel.queue(CurrentInfo.queue, durable: true);
queue.bind(exchange, CurrentInfo.queue);
Consumer consumer = await queue.consume();
consumer.listen( (AmqpMessage message) {
var payload = Payload.fromJson(json.decode(message.payloadAsString));
if ( payload.action == "notification" ) {
print("AMQP::consume::action::notification");
print(payload.toJson());
}
if ( payload.action == 'answer-order' ) {
print("AMQP::consume::action::answer-order");
print(payload.toJson());
Vibration.vibrate(duration: 10000, intensities: [100, 200, 100, 200, 100, 200, 100, 200, 100, 200]);
showActivityBottomSheet(AnswerOrderDetails.fromJson(payload.data));
});
}
/**
print(" [x] Received:: payloadString :: ${message.payloadAsString}");
print(" [x] Received:: payloadJson :: ${message.payloadAsJson}");
print(" [x] Received:: routingKey :: ${message.routingKey}");
*/
});
}
/// *************************************
/// publisher functions
///
/// users/track
publishUpdateLocation(double lat,double lng, String usersId, String status) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var position = {
'usersId': usersId,
"lat": lat,
"lng": lng,
"status": status
};
print("POSITION::DATA:: $position");
exchange.publish(json.encode(position), 'position-listener');
}
/// users/track/orders
publishUpdateLocationTrackOrder(double lat,double lng, String usersId, String ordersId, String status) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var position = {
'usersId': usersId,
'ordersId': ordersId,
"lat": lat,
"lng": lng,
"status": status
};
exchange.publish(json.encode(position), 'position-orders-listener');
}
/// answer-order
publishAnswerOrder(String answer, String usersId, String ordersId) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var data = {
'usersId': usersId,
"ordersId": ordersId,
"answer": answer
};
exchange.publish(json.encode(data), 'answer-orders-listener');
print("ANSWER::ORDER:: $data");
}
///
publishInteractOrder(String type, String usersId, String ordersId) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var data = {
'usersId' : usersId,
"ordersId" : ordersId,
"type" : type // collected, delivered,
};
exchange.publish(json.encode(data), 'interact-orders-listener');
print("INTERACT::ORDER:: $data");
}
}
This behavior is intentional as per the comment about not re-establishing topologies after re-connecting.
The client will surface connection lost errors via a broadcast stream that you can subscribe to. The recommended approach is to specify an error listener and check for connection-lost errors:
client.errorListener((ex) => handleError(ex));
void handleError(Exception ex) {
// Check if ex is a ConnectionException or it's message contains "Lost connection to server"
// and recreate topology.
}