dart_amqp icon indicating copy to clipboard operation
dart_amqp copied to clipboard

amqp connection lost, and recreate multiples consumers in same queue

Open fabios1010 opened this issue 2 years ago • 1 comments

I have been used this lib, but after a while the connection auto close and when start it again it connect with another consumer in queue, I can publish in another queues but it stop consuming. in order to finger out what is happen the solution is complete delete the queue and restart it again. I read in pub.dev that the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.

is that what is happening ? how can I try implement this an check is there is an stable connection or if the queue has consumers? how can I persist the consumer to avoid this? thanks

class Amqp {

  late bool isRunning;
  late Client client;
  late SoundsService soundsService;

  Amqp() {

    isRunning = false;
    ConnectionSettings settings = ConnectionSettings(
      virtualHost:"/",
      host: "rabbitmqhost",
      port: rabbitmqport,
      authProvider: const PlainAuthenticator(
          "rabbitmqusers",
          "rabbitmqpass"
      ),
    );
    client        = Client(settings: settings);
    soundsService = SoundsService();
  }

  getClient() {
    return client;
  }

  start() async {
    print("AMQP::starting");

    //consume();

    client.connect().then( (defaultChannel) => {
      consume()
    }).catchError((onError) => {
      print("ERROR: $onError"),
    });

    return true;
  }

  close() async {
    isRunning = false;
    print("isRunning: $isRunning");
    client.channel().then((channel) => {
      print("amqp::channel::closed"),
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("amqp::queue::closed"),
          client.close().then((_) => {
            print("amqp::client::closed"),
          })
        })
      }),
      channel.close()
    });
    client.close();
    return true;
  }

  void deleteQueue() {
    client.channel().then((channel) => {
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("FORCE::amqp::queue::deleted"),
        })
      }),
    });
  }

  /// *************************************
  /// consumer
  consume() async {

    isRunning = true;

    print("AMQP::connected::starting-consume");
    print("isRunning: $isRunning");

    Channel channel   = await client.channel();

    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);

    Queue queue       = await channel.queue(CurrentInfo.queue, durable: true);
    queue.bind(exchange, CurrentInfo.queue);

    Consumer consumer = await queue.consume();
    consumer.listen( (AmqpMessage message) {

      var payload = Payload.fromJson(json.decode(message.payloadAsString));
      if ( payload.action == "notification" ) {
        print("AMQP::consume::action::notification");
        print(payload.toJson());
      }

      if ( payload.action == 'answer-order' ) {
        print("AMQP::consume::action::answer-order");
        print(payload.toJson());

        Vibration.vibrate(duration: 10000, intensities: [100, 200, 100, 200, 100, 200, 100, 200, 100, 200]);
        showActivityBottomSheet(AnswerOrderDetails.fromJson(payload.data));

       });
      }

      /**
          print(" [x] Received:: payloadString :: ${message.payloadAsString}");
          print(" [x] Received:: payloadJson   :: ${message.payloadAsJson}");
          print(" [x] Received:: routingKey    :: ${message.routingKey}");
       */

    });
  }


  /// *************************************
  /// publisher functions
  ///
  /// users/track
  publishUpdateLocation(double lat,double lng, String usersId,  String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId': usersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    print("POSITION::DATA:: $position");
    exchange.publish(json.encode(position), 'position-listener');
    
  }

  /// users/track/orders
  publishUpdateLocationTrackOrder(double lat,double lng, String usersId, String ordersId, String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId':  usersId,
      'ordersId': ordersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    exchange.publish(json.encode(position), 'position-orders-listener');
   
  }

  /// answer-order
  publishAnswerOrder(String answer, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId':  usersId,
      "ordersId": ordersId,
      "answer":   answer
    };
    exchange.publish(json.encode(data), 'answer-orders-listener');
    print("ANSWER::ORDER:: $data");
   
  }

  ///
  publishInteractOrder(String type, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId'  : usersId,
      "ordersId" : ordersId,
      "type"     : type // collected, delivered,
    };
    exchange.publish(json.encode(data), 'interact-orders-listener');
    print("INTERACT::ORDER:: $data");
   
  }



}

fabios1010 avatar Nov 16 '22 04:11 fabios1010

This behavior is intentional as per the comment about not re-establishing topologies after re-connecting.

The client will surface connection lost errors via a broadcast stream that you can subscribe to. The recommended approach is to specify an error listener and check for connection-lost errors:

client.errorListener((ex) => handleError(ex));

void handleError(Exception ex) {
  // Check if ex is a ConnectionException or it's message contains "Lost connection to server"
  // and recreate topology.
}

achilleasa avatar Nov 26 '22 12:11 achilleasa