streamz
streamz copied to clipboard
Suggestions for improving code performance
This is a code for calculating the rolling average of the future ratio/spot ratio - 1 in real-time. Since there can be a large amount of data streaming in from the websocket every second, about 100-200 data points, I’d like to know if you have any suggestions to improve performance?
import asyncio
import numpy as np
from streamz import Stream
from tradebot.exchange import BinanceWebsocketManager
from tradebot.entity import log_register
from tradebot.constants import MARKET_URLS
# ratio description
# 1. calulate the ratio of future price and spot price
# 2. add ratio to a rolling window of size 20
# 3. calculate the mean of the rolling window
log = log_register.get_logger("BTCUSDT", level="INFO", flush=False)
spot_stream = Stream()
future_stream = Stream()
window_size = 20
def cb_future(msg):
if "e" in msg:
future_stream.emit(msg)
def cb_spot(msg):
if "e" in msg:
spot_stream.emit(msg)
async def main():
try:
ws_spot_client = BinanceWebsocketManager(base_url = "wss://stream.binance.com:9443/ws")
ws_um_client = BinanceWebsocketManager(base_url = "wss://fstream.binance.com/ws")
await ws_um_client.subscribe_trade("BTCUSDT", callback=cb_future)
await ws_spot_client.subscribe_trade("BTCUSDT", callback=cb_spot)
ratio = spot_stream.combine_latest(future_stream).map(lambda x: float(x[1]['p']) / float(x[0]['p']) - 1)
ratio.sliding_window(window_size).map(lambda window: np.mean(window)).sink(lambda x: print(f"Ratio Mean: {x:.8f}"))
# await ws_client.subscribe_book_ticker("ETHUSDT", callback=cb)
# await ws_client.subscribe_agg_trades(["BTCUSDT", "ETHUSDT"], callback=cb)
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await ws_spot_client.close()
await ws_um_client.close()
print("Websocket closed")
if __name__ == "__main__":
asyncio.run(main())
Here is the implementation of BinanceWebsocket
class WebsocketManager(ABC):
def __init__(
self,
base_url: str,
ping_interval: int = 5,
ping_timeout: int = 5,
close_timeout: int = 1,
max_queue: int = 12,
):
self._base_url = base_url
self._ping_interval = ping_interval
self._ping_timeout = ping_timeout
self._close_timeout = close_timeout
self._max_queue = max_queue
self._tasks: List[asyncio.Task] = []
self._subscripions = defaultdict(asyncio.Queue)
self._log = log_register.get_logger(name=type(self).__name__, level="INFO", flush=True)
async def _consume(self, subscription_id: str, callback: Callable[..., Any] = None, *args, **kwargs):
while True:
msg = await self._subscripions[subscription_id].get()
if asyncio.iscoroutinefunction(callback):
await callback(msg, *args, **kwargs)
else:
callback(msg, *args, **kwargs)
self._subscripions[subscription_id].task_done()
@abstractmethod
async def _subscribe(self, symbol: str, typ: str, channel: str, queue_id: str):
pass
async def close(self):
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._log.info("All WebSocket connections closed.")
class BinanceWebsocketManager(WebsocketManager):
def __init__(self, base_url: str):
super().__init__(
base_url=base_url,
ping_interval=5,
ping_timeout=5,
close_timeout=1,
max_queue=12,
)
async def _subscribe(self, payload: Dict[str, Any], subscription_id: str):
async for websocket in websockets.connect(
uri = self._base_url,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
close_timeout=self._close_timeout,
max_queue=self._max_queue,
):
try:
payload = json.dumps(payload)
await websocket.send(payload)
async for msg in websocket:
msg = orjson.loads(msg)
await self._subscripions[subscription_id].put(msg)
except websockets.ConnectionClosed:
self._log.error(f"Connection closed, reconnecting...")
async def subscribe_book_ticker(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"book_ticker.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@bookTicker"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_book_tickers(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_book_ticker(symbol, callback=callback, *args, **kwargs)
async def subscribe_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"trade.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@trade"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_trade(symbol, callback=callback, *args, **kwargs)
async def subscribe_agg_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"agg_trade.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@aggTrade"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_agg_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_agg_trade(symbol, callback=callback, *args, **kwargs)