stompest icon indicating copy to clipboard operation
stompest copied to clipboard

STOMP over websocket

Open twall opened this issue 2 years ago • 0 comments

Just in case anyone wants to emulate a browser, this uses websocket-client to implement the transport for stomper. It's a bit hacky, but it works. You provide a message receive handler, and messages may be sent normally with the client.

usage:

from wsstomp import WSStompClient

def receive_message(command, data, headers):
    """Handle a STOMP message.  data is parsed JSON"""
    print(f"Message received")

ws_client = WSStompClient("<ws-url>", on_receive=receive_message, host="<ws-host>", origin="<origin>", headers={
    "Sec-WebSocket-Protocol": "v12.stomp"
}, debug=verbose)
ws_client.connect(versions=[StompSpec.VERSION_1_2], heartBeats=(0, 0), host="<hostname>")
token = ws_client.subscribe("/user/messages")

wsstomp.py:

import json
import logging
import time
from urllib import parse
from threading import Thread

import websocket
from stompest.config import StompConfig
from stompest.protocol import StompParser, StompSpec
from stompest.sync import client
from websocket import ABNF


class WSTransport(object):
    """Stomp client handles connect only"""
    def __init__(self, url, host=None, origin=None, headers=None, debug=False,
                 on_frame_received=None, **kwargs):
        if debug:
            websocket.enableTrace(True)
        else:
            logging.getLogger("websocket").setLevel(logging.ERROR)
        self.ws = websocket.WebSocketApp(url, on_open=self._on_open,
                                         on_message=self._on_message,
                                         on_error=self._on_error,
                                         on_close=self._on_close,
                                         on_data=self._on_data,
                                         header=[f"{k}: {v}" for k, v in (headers or {}).items()],
                                         **kwargs)
        self.url = url
        self.debug = debug
        self.host = host or parse.urlparse(url).hostname
        self.origin = origin
        self.opened = False
        self.connected = False
        self._parser = StompParser()
        self.on_frame_received = on_frame_received

    def _connect(self, timeout=None):
        thread = Thread(target=lambda: self.ws.run_forever(host=self.host, origin=self.origin), daemon=True)
        thread.start()

        start = time.time()
        while not self.opened:
            time.sleep(0.25)
            if timeout and (time.time() - start) * 1000 >= timeout:
                raise TimeoutError(f"Connection to {self.url} timed out")

    def _on_open(self, ws):
        self.opened = True

    def _on_close(self, ws, status_code, message):
        self.connected = False

    def _on_error(self, ws, error):
        print(f"Error: {error}")

    def _on_message(self, ws, data):
        self._parser.add(data)
        if not self.connected:
            self.connected = True
        elif self.on_frame_received:
            frame = self._parser.get()
            self.on_frame_received(frame)

    def _on_data(self, ws, data, data_type, cont_flag):
        pass

    def canRead(self, timeout=None):
        return True

    def connect(self, timeout=None):
        self._connect(timeout)

    def disconnect(self):
        self.ws.on_close = None
        self.ws.close()
        self.connected = False

    def receive(self):
        # Used on connect only
        return self._parser.get()

    def send(self, stomp_frame):
        # Incoming: StompFrame => outgoing ABNF
        ws_frame = ABNF.create_frame(bytes(stomp_frame), ABNF.OPCODE_BINARY)
        self.ws.sock.send_frame(ws_frame)

    def setVersion(self, version):
        self._parser.version = version


class WSStompClient(client.Stomp):

    def __init__(self, url, url_params=None, config=None, debug=False, on_receive=None, **kwargs):
        pstr = "&".join([f"{parse.quote(key)}={parse.quote(str(value))}" for key, value in (url_params or {}).items()])
        url = f"{url.replace('https', 'wss')}?{pstr}"
        if debug:
            print(f"url {url}")
        parsed = parse.urlparse(url)
        # Provide a dummy config; websocket-client does the actual connection
        super().__init__(config or StompConfig(f"tcp://{parsed.hostname}:443", version=StompSpec.VERSION_1_2))
        self.debug = debug
        def handle_frame(frame):
            if frame.command == StompSpec.MESSAGE:
                self.ack(frame)
            if on_receive:
                on_receive(self, frame.command, json.loads(frame.body.decode() or "{}"), frame.headers)
        factory = lambda host, port, sslContext: WSTransport(url, debug=debug,
                                                             on_frame_received=handle_frame, **kwargs)
        self._transportFactory = factory

twall avatar Dec 19 '22 12:12 twall