quamash
quamash copied to clipboard
Quamash and pyzmq with asyncio
Hello,
I'm attempting to make a GUI client that asynchronously communicates to a server via zeromq. Pyzmq has support for asyncio (i.e. some of the blocking functions return futures). I wrote a naive server/client example based on some pyzmq examples, and I'm not sure if it doesn't work because I'm using something incorrectly, or if there's a fundamental reason that these two won't work together.
Server:
import asyncio
import zmq.asyncio
from zmq.utils.win32 import allow_interrupt
ctx = zmq.asyncio.Context()
loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(loop)
url = 'tcp://127.0.0.1:5555'
@asyncio.coroutine
def async_process(msg):
print(msg)
return msg
@asyncio.coroutine
def recv_and_process():
sock = ctx.socket(zmq.REP)
sock.bind(url)
while True:
print("waiting for client...")
msg = yield from sock.recv_multipart()
reply = yield from async_process(msg)
print("processed! now sending...")
yield from sock.send_multipart(reply)
loop.run_until_complete(recv_and_process())
Client:
import sys
import asyncio
import zmq
import zmq.asyncio
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QPushButton, QLineEdit
from quamash import QEventLoop
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
url = 'tcp://127.0.0.1:5555'
ctx = zmq.asyncio.Context()
class Window(QWidget):
def __init__(self):
super(Window, self).__init__()
self.button = QPushButton('Send', self)
self.button.clicked.connect(self.handleButton)
self.path = QLineEdit()
self.path.move(20, 20)
self.path.resize(280,40)
layout = QVBoxLayout(self)
layout.addWidget(self.button)
layout.addWidget(self.path)
def handleButton(self):
print('Sending text:', self.path.text())
asyncio.ensure_future(get(self.path.text()), loop=loop)
window = Window()
window.show()
@asyncio.coroutine
def get(path):
socket = ctx.socket(zmq.REQ)
socket.connect(url)
socket.send_multipart([path.encode('ascii')])
# yield from asyncio.sleep(1)
reply = yield from socket.recv_multipart()
print(reply)
socket.close()
with loop:
loop.run_forever()
I'm using Python 3.4 here, since I'm not sure PyQt5 is supported in Python 3.5 yet.
The server basically just replies the same message it receives. The client has a textbox. Clicking the "Send" button sends the textbox's text to the server. When doing this, I receive the following error:
File "/.../site-packages/quamash/__init__.py", line 377, in add_reader
notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read)
TypeError: QSocketNotifier(sip.voidptr, QSocketNotifier.Type, parent: QObject = None): argument 1 has unexpected type 'Socket'
The code works fine upon uncommenting the asyncio.sleep line. I assume this assures enough time such that a future is no longer returned.
It's worth pointing out that pyzmq uses its own event loop (see: http://pyzmq.readthedocs.io/en/latest/api/zmq.asyncio.html) called ZMQEventLoop(), as used in the server. I was hoping that the client wouldn't need to use this event loop.
Is there any simple way to integrate zmq.asyncio with quamash, or is this a much larger endeavor?
Thanks!
P.S. Zeromq seems to be mentioned on line 462 of that __init__.py file. What's this in reference to?
see #13 for more info on zeromq I think @horazont or @aknuds1 had a patch out with the aiozmq project to get quamash working better, but it was rejected because they wanted to do go a different way. (I searched the chat logs but couldn't find what I was looking for). They wanted to remove the need to use a custom event loop with zmq and just use the network primitives built into whichever event loop you are using.
You could try using zmq's event loop in the client, and then just doing:
while True:
app.processEvents()
yield from asyncio.sleep(0)
in a coroutine and not use quamash.
The error
TypeError: QSocketNotifier(sip.voidptr, QSocketNotifier.Type, parent: QObject = None): argument 1 has unexpected type 'Socket'
indicates that QSocketNotifier got an unexpected argument. It seems to me, that this should be fixed in the zeromq client by passing in a file descriptor (an integer) to add_reader instead of a socket.socket object. Luckily, this is an easy fix: loop.add_reader(socket.fileno()).
I ran into a very similar issue with asyncio-mqtt and asyncqt (the latter is a fork of quamash).