quamash icon indicating copy to clipboard operation
quamash copied to clipboard

Could not keep auto reconnect to a closed tcp server while ConnectionRefusedError raised

Open perillaseed opened this issue 7 years ago • 2 comments

While using quamash and tcp client connect to a closed tcp server and hope to try to reconnect to this tcp server automatically, it will raised a ConnectionRefusedError and stop to reconnect it again, use asyncio.get_event_loop() has no such problem, it will keep reconnect until the tcp server open. This issus also ask in https://stackoverflow.com/questions/51093428/different-behaviours-when-using-qeventloop-and-asyncio-default-loop-with-python3, and the simplized test code can be found as follows:

import sys
import time
import asyncio
import logging
from quamash import QEventLoop
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QThread, pyqtSignal, QRunnable, QDataStream, pyqtSlot
from PyQt5.QtNetwork import QTcpSocket
      
logger = logging.getLogger(__name__)

from PyQt5.QtWidgets import QMainWindow, QTextBrowser
class MainWindow(QMainWindow):
    sigMsgReceived = pyqtSignal(str)
    def __init__(self, vhost, vport):
        QMainWindow.__init__(self)
        self.vhost = vhost 
        self.vport = vport
        self.centralwidget = QTextBrowser(self)
        self.setCentralWidget(self.centralwidget)
        self.centralwidget.append("Starting...")
        self.sigMsgReceived.connect(self.centralwidget.append)

    
        
    async def update(self):
        vis_loop = asyncio.get_event_loop()
        while True:
            try:
                self.vreader, self.vwriter = await asyncio.open_connection(host=self.vhost, port=self.vport, loop=vis_loop)
                async for msg in self.vreader:
                    self.sigMsgReceived.emit(msg.decode())
            except Exception as e:
                logger.exception(e)
                print("Try to connect after 1 seconds...")
                await asyncio.sleep(1) 


if __name__ == '__main__':
    vhost = "127.0.0.1"
    vport = 8848
        
    logger.setLevel(logging.DEBUG)

    app = QApplication(sys.argv)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)  # NEW must set the event loop
    with loop:
        mw = MainWindow(vhost, vport)
        mw.show()
        asyncio.run_coroutine_threadsafe(mw.update(), loop)
        loop.run_forever()

and here is the exception raised:

Event callback failed
Traceback (most recent call last):
  File "D:\Zibo\Documents\SRC\workspace\QHub\venv\lib\site-packages\quamash\_windows.py", line 42, in _process_events
    value = callback(transferred, key, ov)
  File "D:\Python35\Lib\asyncio\windows_events.py", line 509, in finish_connect
    ov.getresult()
ConnectionRefusedError: [WinError 1225]

perillaseed avatar Jul 02 '18 01:07 perillaseed

On Windows, Quamash doesn't extend the default event loop (selector), instead it extends the Proactor event loop.

(this is because I find the Proactor more useful, since it supports subprocesses...)

Though this might be a problem with quamash itself since the error is bubbling through quamash/_windows.py, in order to really reproduce you'd need to see if this works with a non-quamash proactor event loop.

harvimt avatar Jul 03 '18 06:07 harvimt

This seems like the same issue as #91. It appears to be related to the implementation of the _process_events method of the _ProactorEventLoop. By modifying the OSError except clause to set the exception on f instead of logging, I believe you can achieve the desired result.

	def _process_events(self, events):
		"""Process events from proactor."""
		for f, callback, transferred, key, ov in events:
                        try:
                                self._logger.debug('Invoking event callback {}'.format(callback))
                                value = callback(transferred, key, ov)
-                       except OSError:
-                               self._logger.warning('Event callback failed', exc_info=sys.exc_info())
+                       except OSError as e:
+                               f.set_exception(e)
                        else:
                                f.set_result(value)

Setting the exception on f is what the standard library asyncio implementation of the IocpProactor does.

russellwinstead avatar Sep 06 '18 15:09 russellwinstead