python-binance icon indicating copy to clipboard operation
python-binance copied to clipboard

Queue overflow error, change MAX_QUEUE_SIZE value

Open sprototles opened this issue 4 years ago • 21 comments

My code:

# callback
def process_message_price(msg):
   print(msg)
# websocket
bm = ThreadedWebsocketManager()
bm.start()
# listOfPairings: all pairs with USDT (over 250 items in list)
for pairing in listOfPairings:
     conn_key = bm.start_trade_socket(callback=process_message_price,symbol=pairing)

bm.join()

hovewer after shor time, i am getting following error: 'e': 'error', 'm': 'Queue overflow. Message not filled'

which is caused by MAX_QUEUE_SIZE in streams.py being too small for my program

How can i change this value outside of streams.py file ?

Thx

sprototles avatar Sep 10 '21 12:09 sprototles

Even if you make MAX_QUEUE_SIZE equal to 10000 (or more, it doesn't matter), in the end you will still get this error or the stream stops working

dpcwee avatar Sep 11 '21 16:09 dpcwee

soo... is there a way, how to not get this error ? what should i change ?

sprototles avatar Sep 11 '21 21:09 sprototles

soo... is there a way, how to not get this error ? what should i change ?

from threading import Thread

class Stream():
        
    def start(self):
        self.bm = ThreadedWebsocketManager()
        self.bm.start()
        self.stream_error = False
        self.multiplex_list = list()
            
        # listOfPairings: all pairs with USDT (over 250 items in list)
        for pairing in listOfPairings:
            self.multiplex_list.append(pairing.lower() + '@trade')
        self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
        
        # monitoring the error
        stop_trades = threading.Thread(target = stream.restart_stream, daemon = True)
        stop_trades.start()
        
    def realtime(self, msg):
        if 'data' in msg:
            # Your code
        else:
            self.stream_error = True
        
    def restart_stream(self):
        while True:
            time.sleep(1)
            if self.stream_error == True:
                self.bm.stop_socket(self.multiplex)
                time.sleep(5)
                self.stream_error = False
                self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)

stream = Stream()
stream.start()
stream.bm.join()

And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)

Calling the restart of the stream from except did not work for me either, only in the function as a separate thread

dpcwee avatar Sep 12 '21 07:09 dpcwee

Saved my day!! Thanks.

But why are we getting this error? Is it because of the asynchronous function we used?

I tried to directly use the websocket, seems good for me. ws = websocket.WebSocketApp(f"wss://fstream.binance.com/ws/!bookTicker", on_message=on_message, on_error=on_error) ws.run_forever()

June911 avatar Sep 15 '21 14:09 June911

same problem - hits the queue max - I have about 50 pairs, and 5 intervals per pair.

sfarrell5123 avatar Sep 19 '21 05:09 sfarrell5123

I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.

What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:

async def run_listener():
    while True:
        await socket.__aenter__()
        msg = await socket.recv()
        await socket.__aexit__(None, None, None)
        try:
            frame = create_frame(msg)
            frame.to_sql('BTCUSDT', engine, if_exists='append', index=False)
            print(frame)
        except:
            print(f'Error: {msg["m"]}')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_listener())

Its the line await socket.__aexit__(None, None, None) that sorted out the issue for me

Blademaster680 avatar Dec 17 '21 18:12 Blademaster680

This still does not solve this issue, I have tried this method before. The error logs mention async.Timeout error None.

2021-12-24 23:50:25,669 DEBUG client - state = CONNECTING 2021-12-24 23:50:25,980 DEBUG client - event = connection_made(<asyncio.sslproto._SSLProtocolTransport object at 0x7fb72cefa3a0>) 2021-12-24 23:50:25,981 DEBUG client > GET /ws/bnbusdt_perpetual@continuousKline_15m HTTP/1.1 2021-12-24 23:50:25,981 DEBUG client > Headers([('Host', 'fstream.binance.com'), ('Upgrade', 'websocket'), ('Connection', 'Upgrade'), ('Sec-WebSocket-Key', 'k5bcAVA6k7lGsrlrOHk6SQ=='), ('Sec-WebSocket-Version', '13'), ('Sec-WebSocket-Extensions', 'permessage-deflate; client_max_window_bits'), ('User-Agent', 'Python/3.9 websockets/9.1')]) 2021-12-24 23:50:26,131 DEBUG client - event = data_received(<212 bytes>) 2021-12-24 23:50:26,132 DEBUG client < HTTP/1.1 101 Switching Protocols 2021-12-24 23:50:26,132 DEBUG client < Headers([('Date', 'Fri, 24 Dec 2021 12:50:26 GMT'), ('Connection', 'upgrade'), ('upgrade', 'websocket'), ('sec-websocket-accept', 'nwEU6MiTPjGVy4SN25IgrGxlmws='), ('sec-websocket-extensions', 'permessage-deflate')]) 2021-12-24 23:50:26,133 DEBUG client - state = OPEN 2021-12-24 23:50:26,528 DEBUG client - event = data_received(<210 bytes>) 2021-12-24 23:50:26,528 DEBUG client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"e":"continuous_kline","E":1640350226478,"ps":"BNBUSDT","ct":"PERPETUAL","k":{"t":1640349900000,"T":1640350799999,"i":"15m","f":1059051713329,"L":1059062641236,"o":"546.700","c":"547.730","h":"547.770","l":"546.390","v":"1770.13","n":1846,"x":false,"q":"968844.37900","V":"1069.84","Q":"585580.83790","B":"0"}}', rsv1=False, rsv2=False, rsv3=False) 2021-12-24 23:50:26,529 DEBUG client ! failing OPEN WebSocket connection with code 1006 2021-12-24 23:50:26,529 DEBUG client - state = CLOSING 2021-12-24 23:50:26,529 DEBUG client > Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False) 2021-12-24 23:50:26,632 DEBUG client ! timed out waiting for TCP close 2021-12-24 23:50:26,632 DEBUG client x closing TCP connection 2021-12-24 23:50:26,736 DEBUG client ! timed out waiting for TCP close 2021-12-24 23:50:26,736 DEBUG client x aborting TCP connection 2021-12-24 23:50:26,754 DEBUG client - event = connection_lost(None) 2021-12-24 23:50:26,754 DEBUG client - state = CLOSED 2021-12-24 23:50:26,754 DEBUG client x code = 1006, reason = [no reason] 2021-12-24 23:50:27,870 DEBUG connection close error (code = 1006 (connection closed abnormally [internal]), no reason)

djangoengine avatar Dec 24 '21 12:12 djangoengine

This solved the problem for me:

await socket.aenter() msg = await socket.recv() if msg['e'] == 'error': print('error') client.close_connection() # close and restart the socket bsm = BinanceSocketManager(client) socket = bsm.trade_socket(pair) else: frame = createframe(msg) frame.to_sql(pair, engine, if_exists="append", index=False) print(frame)

aknorbert avatar Dec 30 '21 18:12 aknorbert

Another solution is to create a new TWM per pair or per 3 or 4 pairs. But of course, you need a CPU with lots of threads.

willmcpo avatar Jan 18 '22 04:01 willmcpo

I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.

What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:

async def run_listener():
    while True:
        await socket.__aenter__()
        msg = await socket.recv()
        await socket.__aexit__(None, None, None)
        try:
            frame = create_frame(msg)
            frame.to_sql('BTCUSDT', engine, if_exists='append', index=False)
            print(frame)
        except:
            print(f'Error: {msg["m"]}')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_listener())

Its the line await socket.__aexit__(None, None, None) that sorted out the issue for me

await socket.__aexit__(None, None, None) this line work perfectly!

nhoxbin avatar Jan 21 '22 14:01 nhoxbin

What fixed this for me was i used streams.py script from v1.0.13

mrkeyiano avatar Aug 06 '22 18:08 mrkeyiano

What fixed this for me was i used streams.py script from v1.0.13 you use the last version with just the stream.py script from the version v1.0.13 right?

kfdl avatar Aug 09 '22 09:08 kfdl

What fixed this for me was i used streams.py script from v1.0.13 you use the last version with just the stream.py script from the version v1.0.13 right?

exactly, yes!

mrkeyiano avatar Aug 09 '22 09:08 mrkeyiano

What fixed this for me was i used streams.py script from v1.0.13

could you please specify what you mean by this? Sharing a code snippet would be highly appreciated

Karlheinzniebuhr avatar Aug 17 '22 14:08 Karlheinzniebuhr

What fixed this for me was i used streams.py script from v1.0.13

could you please specify what you mean by this? Sharing a code snippet would be highly appreciated

You have to download the python-binance package from source and use it in your code https://github.com/sammchardy/python-binance/tree/master/binance

then replace the streams.py with an outdated version (v1.0.13) of the file from source https://github.com/sammchardy/python-binance/tree/v1.0.13/binance

mrkeyiano avatar Aug 18 '22 00:08 mrkeyiano

Hello,

FYI, I have test this solution and it is not really working, just the part below have no "else" statement in the v1.0.13:

if res and self._queue.qsize() < 100:
                await self._queue.put(res)

but the queue is still overflow and we still lose some messages,

My solution is: I increase the limit to 10000 and I have change in the threaded_stream.py the wait_for from 3 to 7:

while self._socket_running[path]:
                try:
                    msg = await asyncio.wait_for(s.recv(), 7)

I have test for 4 hours straight and I had the "else" statement to log when the queue is overflow, and it never happen again. everything looks good for me

Best regards

kfdl avatar Aug 18 '22 17:08 kfdl

This issue is a feature rather than a bug(Preventing from memory leak). It only means the msg handler can not consume the msg as fast as the receiver. Memory usage will increase unlimitiedly without the MAX_QUEUE_SIZE. The solution other comments has proposed went the wrong way.

And the solution is obvious, just imporve the efficiency of the msg handler, or split pairs into multiple processes.

furoxr avatar Sep 05 '22 00:09 furoxr

I think there is still some issue/bug in ThreadedWebsocketManager but I did not have the time yet to investigate. I also assumed first that the issue is my callback being too slow, so I've implemented a reconnect/reset whenever the error appears, however it only appears once at the very beginning, and afterwards it runs fine without reconnect/reset.

Maybe I'm wrong, so if somebody has the knowledge and time please have a look at my solution:

#!/usr/bin/env python3.9

import logging
from time import sleep
from binance import ThreadedWebsocketManager

CH = logging.StreamHandler()
CH.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(message)s'))

SYMBOLS = ['XRP/BNB', 'ETH/BTC', 'LTC/BTC', 'XRP/BUSD', 
            'XRP/BTC', 'LTC/USDT', 'BNB/BUSD', 'XRP/USDT', 
            'ETH/USDT', 'LTC/BNB', 'LTC/BUSD', 'BNB/USDT', 
            'BTC/USDT', 'BTC/BUSD', 'BNB/BTC', 'ETH/BUSD']

class QueueManager():
    _log = logging.getLogger(__name__)
    _log.setLevel(logging.WARNING)
    _log.addHandler(CH)

    def __init__(self, symbols:list=[]) -> None:
        self._twm = ThreadedWebsocketManager()
        self._streams = list(map(lambda s: f"{s.replace('/','').lower()}@bookTicker", symbols))
        self._twm.start()
        self._log.warning(f"Start listening to {len(self._streams)} streams")
        self._listener:str = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)

    def _handle_socket_message(self, message):
        if ('e' in message):
          if (message['m']=='Queue overflow. Message not filled'):
            self._log.warning("Socket queue full. Resetting connection.")
            self.reset_socket()
            return
          else:
            self._log.error(f"Stream error: {message['m']}")
            exit(1)

        (u, s, b, B, a, A) = message['data'].values()
        # do something with the message
        self._log.debug(f"{s}: buy - {b}, bid - {a}")
        
        if False:
          # in case your internal logic invalidates the items in the queue 
          # (e.g. your business logic ran too long and items in queue became "too old")
          reset_socket()

    def reset_socket(self):
        self._twm.stop_socket(self._listener)
        self._listener = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)
        if (self._log.isEnabledFor(logging.DEBUG)):
          self._log.debug("Reconnecting. Waiting for 5 seconds...")
          sleep(5)

    def join(self):
        self._twm.join()

def main():
    manager = QueueManager(SYMBOLS)
    manager.join()

if __name__ == "__main__":
    main()

kisszoltan avatar Sep 19 '22 10:09 kisszoltan

soo... is there a way, how to not get this error ? what should i change ?

from threading import Thread

class Stream():
        
    def start(self):
        self.bm = ThreadedWebsocketManager()
        self.bm.start()
        self.stream_error = False
        self.multiplex_list = list()
            
        # listOfPairings: all pairs with USDT (over 250 items in list)
        for pairing in listOfPairings:
            self.multiplex_list.append(pairing.lower() + '@trade')
        self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
        
        # monitoring the error
        stop_trades = threading.Thread(target = stream.restart_stream, daemon = True)
        stop_trades.start()
        
    def realtime(self, msg):
        if 'data' in msg:
            # Your code
        else:
            self.stream_error = True
        
    def restart_stream(self):
        while True:
            time.sleep(1)
            if self.stream_error == True:
                self.bm.stop_socket(self.multiplex)
                time.sleep(5)
                self.stream_error = False
                self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)

stream = Stream()
stream.start()
stream.bm.join()

And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)

Calling the restart of the stream from except did not work for me either, only in the function as a separate thread

Dear,

Thank you so much for the code and your contribution! this has been absolutely helpful! However, would you please be kind enough and enlighten me on a possible snippet code in the "your Code" section just so that i can get an idea of how to use the variables in the right manner? the current code i have in the "your code" section is as follows:

    if 'data' in msg:
        async def main():
            async with self.bm as tscm:
                while True:
                    res = await tscm.recv()
                    if res:
                        frame = createframe(res)
                        frame.to_sql(frame.symbol[0], engine, if_exists='append', index=False)
    else:
        self.stream_error = True

I would be super grateful if you tell me how i can correct the code since it is actually not adding anything to the database i created and which i am trying to do with my code.

jujz23345 avatar Oct 01 '22 21:10 jujz23345

if False: # in case your internal logic invalidates the items in the queue # (e.g. your business logic ran too long and items in queue became "too old") reset_socket() how will the progress of execution get here if you always have FALSE, it turns out that reset_socket will never work?

Vgeek356 avatar Nov 25 '22 10:11 Vgeek356

Can we get this fixed in a new version ?

Karlheinzniebuhr avatar Mar 09 '24 00:03 Karlheinzniebuhr