stompest
stompest copied to clipboard
When use version=1.2, the consumer stop to connect on topic
Hi,
When I enable version=1.2
to use heart-beat, the stompest client successfully to connect on ActiveMQ, but it seems to never send frame to connect on Topic. On ActiveMQ UI, I not look the consumer.
The same code work fine when I remove heartBeats option and put version=1.0. Any idea ?
Here my relevant code:
# coding: utf8
from __future__ import unicode_literals
import json
import logging
from twisted.internet import reactor, defer
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from fr.sihm.grabInventoryToSupervision.service.InventoryServerListener import InventoryServerListener
import logging
import sys, traceback, time
logger = logging
class ConsumerServer(object):
ERROR_QUEUE = '/queue/testConsumerError'
def connect(self, IP, port=61613, login=None, password=None):
if IP is None or IP == "":
raise ("IP can't be null and can't be empty")
if login is not None and password is not None:
self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), login=login,
passcode=password, version="1.2")
else:
self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), version="1.2")
@defer.inlineCallbacks
def run(self, destinations):
if isinstance(destinations, dict) is False:
raise ("destination can't be null and can't be empty")
if "serverInventory" not in destinations:
raise ("You must set the destination to consume db inventory AMQP")
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER:
StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize':
'2000'
}
try:
client = yield Stomp(self._config).connect(headers=headers, heartBeats=(10000, 10000))
client.disconnected.addCallbacks(
lambda _: client.disconnected, lambda _: self.reconnect(
destinations))
client.subscribe(
destinations["serverInventory"],
headers,
listener=SubscriptionListener(
InventoryServerListener.consume,
onMessageFailed=self.manageError))
except Exception, e:
logger.warn(
"Can't connect to ActiveMQ. Retry connexion in 60 seconds. Error: %s",
str(e))
logger.warn(str(traceback.format_exc()))
time.sleep(60)
self.run(destinations)
def reconnect(self, destinations):
logger.warn(
"The connexion with ActivMQ is lost. Retry connexion in 60 seconds"
)
time.sleep(60)
self.run(destinations)
def manageError(self, connection, failure, frame, errorDestination):
logging.error("Error : " + str(failure))