axis icon indicating copy to clipboard operation
axis copied to clipboard

Websocket support

Open Kane610 opened this issue 1 year ago • 1 comments

https://github.com/dmannion1972/axis-websocket-metadata-python/blob/main/getsesssionid.py

https://github.com/dmannion1972/axis-websocket-metadata-python

https://github.com/encode/httpx/issues/304 https://gist.github.com/tomchristie/3293d5b118b5646ce79cc074976744b0

import trio

async def main():
    async with ws_connect("ws://127.0.0.1:8765") as websockets:
        await websockets.send("Hello, world.")
        message = await websockets.recv()
        print(message)

trio.run(main)
import base64
import contextlib
import os

import httpx
import wsproto


class ConnectionClosed(Exception):
    pass


class WebsocketConnection:
    def __init__(self, network_steam):
        self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT)
        self._network_stream = network_steam
        self._events = []

    async def send(self, text):
        """
        Send a text frame over the websocket connection.
        """
        event = wsproto.events.TextMessage(text)
        data = self._ws_connection_state.send(event)
        await self._network_stream.write(data)

    async def recv(self):
        """
        Receive the next text frame from the websocket connection.
        """
        while not self._events:
            data = await self._network_stream.read(max_bytes=4096)
            self._ws_connection_state.receive_data(data)
            self._events = list(self._ws_connection_state.events())

        event = self._events.pop(0)
        if isinstance(event, wsproto.events.TextMessage):
            return event.data
        elif isinstance(event, wsproto.events.CloseConnection):
            raise ConnectionClosed()


@contextlib.asynccontextmanager
async def ws_connect(url):
    headers = {
        "connection": "upgrade",
        "upgrade": "websocket",
        "sec-websocket-key": base64.b64encode(os.urandom(16)),
        "sec-websocket-version": "13",
    }

    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url, headers=headers) as response:
            network_steam = response.extensions["network_stream"]
            yield WebsocketConnection(network_steam)

Kane610 avatar Mar 16 '23 13:03 Kane610

https://github.com/frankie567/httpx-ws/tree/main

Kane610 avatar Nov 02 '23 18:11 Kane610