Queue overflow error, change MAX_QUEUE_SIZE value
My code:
# callback
def process_message_price(msg):
print(msg)
# websocket
bm = ThreadedWebsocketManager()
bm.start()
# listOfPairings: all pairs with USDT (over 250 items in list)
for pairing in listOfPairings:
conn_key = bm.start_trade_socket(callback=process_message_price,symbol=pairing)
bm.join()
hovewer after shor time, i am getting following error: 'e': 'error', 'm': 'Queue overflow. Message not filled'
which is caused by MAX_QUEUE_SIZE in streams.py being too small for my program
How can i change this value outside of streams.py file ?
Thx
Even if you make MAX_QUEUE_SIZE equal to 10000 (or more, it doesn't matter), in the end you will still get this error or the stream stops working
soo... is there a way, how to not get this error ? what should i change ?
soo... is there a way, how to not get this error ? what should i change ?
from threading import Thread
class Stream():
def start(self):
self.bm = ThreadedWebsocketManager()
self.bm.start()
self.stream_error = False
self.multiplex_list = list()
# listOfPairings: all pairs with USDT (over 250 items in list)
for pairing in listOfPairings:
self.multiplex_list.append(pairing.lower() + '@trade')
self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
# monitoring the error
stop_trades = threading.Thread(target = stream.restart_stream, daemon = True)
stop_trades.start()
def realtime(self, msg):
if 'data' in msg:
# Your code
else:
self.stream_error = True
def restart_stream(self):
while True:
time.sleep(1)
if self.stream_error == True:
self.bm.stop_socket(self.multiplex)
time.sleep(5)
self.stream_error = False
self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
stream = Stream()
stream.start()
stream.bm.join()
And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)
Calling the restart of the stream from except did not work for me either, only in the function as a separate thread
Saved my day!! Thanks.
But why are we getting this error? Is it because of the asynchronous function we used?
I tried to directly use the websocket, seems good for me.
ws = websocket.WebSocketApp(f"wss://fstream.binance.com/ws/!bookTicker", on_message=on_message, on_error=on_error)
ws.run_forever()
same problem - hits the queue max - I have about 50 pairs, and 5 intervals per pair.
I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.
What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:
async def run_listener():
while True:
await socket.__aenter__()
msg = await socket.recv()
await socket.__aexit__(None, None, None)
try:
frame = create_frame(msg)
frame.to_sql('BTCUSDT', engine, if_exists='append', index=False)
print(frame)
except:
print(f'Error: {msg["m"]}')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run_listener())
Its the line await socket.__aexit__(None, None, None) that sorted out the issue for me
This still does not solve this issue, I have tried this method before. The error logs mention async.Timeout error None.
2021-12-24 23:50:25,669 DEBUG client - state = CONNECTING 2021-12-24 23:50:25,980 DEBUG client - event = connection_made(<asyncio.sslproto._SSLProtocolTransport object at 0x7fb72cefa3a0>) 2021-12-24 23:50:25,981 DEBUG client > GET /ws/bnbusdt_perpetual@continuousKline_15m HTTP/1.1 2021-12-24 23:50:25,981 DEBUG client > Headers([('Host', 'fstream.binance.com'), ('Upgrade', 'websocket'), ('Connection', 'Upgrade'), ('Sec-WebSocket-Key', 'k5bcAVA6k7lGsrlrOHk6SQ=='), ('Sec-WebSocket-Version', '13'), ('Sec-WebSocket-Extensions', 'permessage-deflate; client_max_window_bits'), ('User-Agent', 'Python/3.9 websockets/9.1')]) 2021-12-24 23:50:26,131 DEBUG client - event = data_received(<212 bytes>) 2021-12-24 23:50:26,132 DEBUG client < HTTP/1.1 101 Switching Protocols 2021-12-24 23:50:26,132 DEBUG client < Headers([('Date', 'Fri, 24 Dec 2021 12:50:26 GMT'), ('Connection', 'upgrade'), ('upgrade', 'websocket'), ('sec-websocket-accept', 'nwEU6MiTPjGVy4SN25IgrGxlmws='), ('sec-websocket-extensions', 'permessage-deflate')]) 2021-12-24 23:50:26,133 DEBUG client - state = OPEN 2021-12-24 23:50:26,528 DEBUG client - event = data_received(<210 bytes>) 2021-12-24 23:50:26,528 DEBUG client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"e":"continuous_kline","E":1640350226478,"ps":"BNBUSDT","ct":"PERPETUAL","k":{"t":1640349900000,"T":1640350799999,"i":"15m","f":1059051713329,"L":1059062641236,"o":"546.700","c":"547.730","h":"547.770","l":"546.390","v":"1770.13","n":1846,"x":false,"q":"968844.37900","V":"1069.84","Q":"585580.83790","B":"0"}}', rsv1=False, rsv2=False, rsv3=False) 2021-12-24 23:50:26,529 DEBUG client ! failing OPEN WebSocket connection with code 1006 2021-12-24 23:50:26,529 DEBUG client - state = CLOSING 2021-12-24 23:50:26,529 DEBUG client > Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False) 2021-12-24 23:50:26,632 DEBUG client ! timed out waiting for TCP close 2021-12-24 23:50:26,632 DEBUG client x closing TCP connection 2021-12-24 23:50:26,736 DEBUG client ! timed out waiting for TCP close 2021-12-24 23:50:26,736 DEBUG client x aborting TCP connection 2021-12-24 23:50:26,754 DEBUG client - event = connection_lost(None) 2021-12-24 23:50:26,754 DEBUG client - state = CLOSED 2021-12-24 23:50:26,754 DEBUG client x code = 1006, reason = [no reason] 2021-12-24 23:50:27,870 DEBUG connection close error (code = 1006 (connection closed abnormally [internal]), no reason)
This solved the problem for me:
await socket.aenter() msg = await socket.recv() if msg['e'] == 'error': print('error') client.close_connection() # close and restart the socket bsm = BinanceSocketManager(client) socket = bsm.trade_socket(pair) else: frame = createframe(msg) frame.to_sql(pair, engine, if_exists="append", index=False) print(frame)
Another solution is to create a new TWM per pair or per 3 or 4 pairs. But of course, you need a CPU with lots of threads.
I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.
What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:
async def run_listener(): while True: await socket.__aenter__() msg = await socket.recv() await socket.__aexit__(None, None, None) try: frame = create_frame(msg) frame.to_sql('BTCUSDT', engine, if_exists='append', index=False) print(frame) except: print(f'Error: {msg["m"]}') if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(run_listener())Its the line
await socket.__aexit__(None, None, None)that sorted out the issue for me
await socket.__aexit__(None, None, None) this line work perfectly!
What fixed this for me was i used streams.py script from v1.0.13
What fixed this for me was i used streams.py script from v1.0.13 you use the last version with just the stream.py script from the version v1.0.13 right?
What fixed this for me was i used streams.py script from v1.0.13 you use the last version with just the stream.py script from the version v1.0.13 right?
exactly, yes!
What fixed this for me was i used streams.py script from v1.0.13
could you please specify what you mean by this? Sharing a code snippet would be highly appreciated
What fixed this for me was i used streams.py script from v1.0.13
could you please specify what you mean by this? Sharing a code snippet would be highly appreciated
You have to download the python-binance package from source and use it in your code https://github.com/sammchardy/python-binance/tree/master/binance
then replace the streams.py with an outdated version (v1.0.13) of the file from source https://github.com/sammchardy/python-binance/tree/v1.0.13/binance
Hello,
FYI, I have test this solution and it is not really working, just the part below have no "else" statement in the v1.0.13:
if res and self._queue.qsize() < 100:
await self._queue.put(res)
but the queue is still overflow and we still lose some messages,
My solution is: I increase the limit to 10000 and I have change in the threaded_stream.py the wait_for from 3 to 7:
while self._socket_running[path]:
try:
msg = await asyncio.wait_for(s.recv(), 7)
I have test for 4 hours straight and I had the "else" statement to log when the queue is overflow, and it never happen again. everything looks good for me
Best regards
This issue is a feature rather than a bug(Preventing from memory leak). It only means the msg handler can not consume the msg as fast as the receiver. Memory usage will increase unlimitiedly without the MAX_QUEUE_SIZE. The solution other comments has proposed went the wrong way.
And the solution is obvious, just imporve the efficiency of the msg handler, or split pairs into multiple processes.
I think there is still some issue/bug in ThreadedWebsocketManager but I did not have the time yet to investigate. I also assumed first that the issue is my callback being too slow, so I've implemented a reconnect/reset whenever the error appears, however it only appears once at the very beginning, and afterwards it runs fine without reconnect/reset.
Maybe I'm wrong, so if somebody has the knowledge and time please have a look at my solution:
#!/usr/bin/env python3.9
import logging
from time import sleep
from binance import ThreadedWebsocketManager
CH = logging.StreamHandler()
CH.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(message)s'))
SYMBOLS = ['XRP/BNB', 'ETH/BTC', 'LTC/BTC', 'XRP/BUSD',
'XRP/BTC', 'LTC/USDT', 'BNB/BUSD', 'XRP/USDT',
'ETH/USDT', 'LTC/BNB', 'LTC/BUSD', 'BNB/USDT',
'BTC/USDT', 'BTC/BUSD', 'BNB/BTC', 'ETH/BUSD']
class QueueManager():
_log = logging.getLogger(__name__)
_log.setLevel(logging.WARNING)
_log.addHandler(CH)
def __init__(self, symbols:list=[]) -> None:
self._twm = ThreadedWebsocketManager()
self._streams = list(map(lambda s: f"{s.replace('/','').lower()}@bookTicker", symbols))
self._twm.start()
self._log.warning(f"Start listening to {len(self._streams)} streams")
self._listener:str = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)
def _handle_socket_message(self, message):
if ('e' in message):
if (message['m']=='Queue overflow. Message not filled'):
self._log.warning("Socket queue full. Resetting connection.")
self.reset_socket()
return
else:
self._log.error(f"Stream error: {message['m']}")
exit(1)
(u, s, b, B, a, A) = message['data'].values()
# do something with the message
self._log.debug(f"{s}: buy - {b}, bid - {a}")
if False:
# in case your internal logic invalidates the items in the queue
# (e.g. your business logic ran too long and items in queue became "too old")
reset_socket()
def reset_socket(self):
self._twm.stop_socket(self._listener)
self._listener = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)
if (self._log.isEnabledFor(logging.DEBUG)):
self._log.debug("Reconnecting. Waiting for 5 seconds...")
sleep(5)
def join(self):
self._twm.join()
def main():
manager = QueueManager(SYMBOLS)
manager.join()
if __name__ == "__main__":
main()
soo... is there a way, how to not get this error ? what should i change ?
from threading import Thread class Stream(): def start(self): self.bm = ThreadedWebsocketManager() self.bm.start() self.stream_error = False self.multiplex_list = list() # listOfPairings: all pairs with USDT (over 250 items in list) for pairing in listOfPairings: self.multiplex_list.append(pairing.lower() + '@trade') self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list) # monitoring the error stop_trades = threading.Thread(target = stream.restart_stream, daemon = True) stop_trades.start() def realtime(self, msg): if 'data' in msg: # Your code else: self.stream_error = True def restart_stream(self): while True: time.sleep(1) if self.stream_error == True: self.bm.stop_socket(self.multiplex) time.sleep(5) self.stream_error = False self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list) stream = Stream() stream.start() stream.bm.join()And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)
Calling the restart of the stream from except did not work for me either, only in the function as a separate thread
Dear,
Thank you so much for the code and your contribution! this has been absolutely helpful! However, would you please be kind enough and enlighten me on a possible snippet code in the "your Code" section just so that i can get an idea of how to use the variables in the right manner? the current code i have in the "your code" section is as follows:
if 'data' in msg:
async def main():
async with self.bm as tscm:
while True:
res = await tscm.recv()
if res:
frame = createframe(res)
frame.to_sql(frame.symbol[0], engine, if_exists='append', index=False)
else:
self.stream_error = True
I would be super grateful if you tell me how i can correct the code since it is actually not adding anything to the database i created and which i am trying to do with my code.
if False: # in case your internal logic invalidates the items in the queue # (e.g. your business logic ran too long and items in queue became "too old") reset_socket()how will the progress of execution get here if you always have FALSE, it turns out that reset_socket will never work?
Can we get this fixed in a new version ?