stompest icon indicating copy to clipboard operation
stompest copied to clipboard

When use version=1.2, the consumer stop to connect on topic

Open disaster37 opened this issue 4 years ago • 0 comments

Hi,

When I enable version=1.2 to use heart-beat, the stompest client successfully to connect on ActiveMQ, but it seems to never send frame to connect on Topic. On ActiveMQ UI, I not look the consumer.

The same code work fine when I remove heartBeats option and put version=1.0. Any idea ?

Here my relevant code:

# coding: utf8
from __future__ import unicode_literals

import json
import logging
from twisted.internet import reactor, defer
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from fr.sihm.grabInventoryToSupervision.service.InventoryServerListener import InventoryServerListener
import logging
import sys, traceback, time

logger = logging


class ConsumerServer(object):
    ERROR_QUEUE = '/queue/testConsumerError'

    def connect(self, IP, port=61613, login=None, password=None):
        if IP is None or IP == "":
            raise ("IP can't be null and can't be empty")

        if login is not None and password is not None:
            self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), login=login,
                                       passcode=password, version="1.2")

        else:
            self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), version="1.2")

    @defer.inlineCallbacks
    def run(self, destinations):

        if isinstance(destinations, dict) is False:
            raise ("destination can't be null and can't be empty")

        if "serverInventory" not in destinations:
            raise ("You must set the destination to consume db inventory AMQP")

        headers = {
            # client-individual mode is necessary for concurrent processing
            # (requires ActiveMQ >= 5.2)
            StompSpec.ACK_HEADER:
            StompSpec.ACK_CLIENT_INDIVIDUAL,
            # the maximal number of messages the broker will let you work on at the same time
            'activemq.prefetchSize':
            '2000'
        }

        try:
            client = yield Stomp(self._config).connect(headers=headers, heartBeats=(10000, 10000))
            client.disconnected.addCallbacks(
                lambda _: client.disconnected, lambda _: self.reconnect(
                    destinations))
            client.subscribe(
                destinations["serverInventory"],
                headers,
                listener=SubscriptionListener(
                    InventoryServerListener.consume,
                    onMessageFailed=self.manageError))

        except Exception, e:
            logger.warn(
                "Can't connect to ActiveMQ. Retry connexion in 60 seconds. Error: %s",
                str(e))
            logger.warn(str(traceback.format_exc()))
            time.sleep(60)
            self.run(destinations)

    def reconnect(self, destinations):
        logger.warn(
            "The connexion with ActivMQ is lost. Retry connexion in 60 seconds"
        )
        time.sleep(60)
        self.run(destinations)

    def manageError(self, connection, failure, frame, errorDestination):
        logging.error("Error : " + str(failure))

disaster37 avatar Dec 18 '20 15:12 disaster37