stompest
stompest copied to clipboard
STOMP over websocket
Just in case anyone wants to emulate a browser, this uses websocket-client
to implement the transport for stomper
. It's a bit hacky, but it works. You provide a message receive handler, and messages may be sent normally with the client.
usage:
from wsstomp import WSStompClient
def receive_message(command, data, headers):
"""Handle a STOMP message. data is parsed JSON"""
print(f"Message received")
ws_client = WSStompClient("<ws-url>", on_receive=receive_message, host="<ws-host>", origin="<origin>", headers={
"Sec-WebSocket-Protocol": "v12.stomp"
}, debug=verbose)
ws_client.connect(versions=[StompSpec.VERSION_1_2], heartBeats=(0, 0), host="<hostname>")
token = ws_client.subscribe("/user/messages")
wsstomp.py:
import json
import logging
import time
from urllib import parse
from threading import Thread
import websocket
from stompest.config import StompConfig
from stompest.protocol import StompParser, StompSpec
from stompest.sync import client
from websocket import ABNF
class WSTransport(object):
"""Stomp client handles connect only"""
def __init__(self, url, host=None, origin=None, headers=None, debug=False,
on_frame_received=None, **kwargs):
if debug:
websocket.enableTrace(True)
else:
logging.getLogger("websocket").setLevel(logging.ERROR)
self.ws = websocket.WebSocketApp(url, on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_data=self._on_data,
header=[f"{k}: {v}" for k, v in (headers or {}).items()],
**kwargs)
self.url = url
self.debug = debug
self.host = host or parse.urlparse(url).hostname
self.origin = origin
self.opened = False
self.connected = False
self._parser = StompParser()
self.on_frame_received = on_frame_received
def _connect(self, timeout=None):
thread = Thread(target=lambda: self.ws.run_forever(host=self.host, origin=self.origin), daemon=True)
thread.start()
start = time.time()
while not self.opened:
time.sleep(0.25)
if timeout and (time.time() - start) * 1000 >= timeout:
raise TimeoutError(f"Connection to {self.url} timed out")
def _on_open(self, ws):
self.opened = True
def _on_close(self, ws, status_code, message):
self.connected = False
def _on_error(self, ws, error):
print(f"Error: {error}")
def _on_message(self, ws, data):
self._parser.add(data)
if not self.connected:
self.connected = True
elif self.on_frame_received:
frame = self._parser.get()
self.on_frame_received(frame)
def _on_data(self, ws, data, data_type, cont_flag):
pass
def canRead(self, timeout=None):
return True
def connect(self, timeout=None):
self._connect(timeout)
def disconnect(self):
self.ws.on_close = None
self.ws.close()
self.connected = False
def receive(self):
# Used on connect only
return self._parser.get()
def send(self, stomp_frame):
# Incoming: StompFrame => outgoing ABNF
ws_frame = ABNF.create_frame(bytes(stomp_frame), ABNF.OPCODE_BINARY)
self.ws.sock.send_frame(ws_frame)
def setVersion(self, version):
self._parser.version = version
class WSStompClient(client.Stomp):
def __init__(self, url, url_params=None, config=None, debug=False, on_receive=None, **kwargs):
pstr = "&".join([f"{parse.quote(key)}={parse.quote(str(value))}" for key, value in (url_params or {}).items()])
url = f"{url.replace('https', 'wss')}?{pstr}"
if debug:
print(f"url {url}")
parsed = parse.urlparse(url)
# Provide a dummy config; websocket-client does the actual connection
super().__init__(config or StompConfig(f"tcp://{parsed.hostname}:443", version=StompSpec.VERSION_1_2))
self.debug = debug
def handle_frame(frame):
if frame.command == StompSpec.MESSAGE:
self.ack(frame)
if on_receive:
on_receive(self, frame.command, json.loads(frame.body.decode() or "{}"), frame.headers)
factory = lambda host, port, sslContext: WSTransport(url, debug=debug,
on_frame_received=handle_frame, **kwargs)
self._transportFactory = factory