Message sending is blocked by receiving
Problem description:
I am trying to write a bot, that receives direct messages, and respond to them. So I am trying to keep a subscription open, and save events continuously. Then in a different thread, I am parsing the messages, and respond to them. Unfortunately in relay.connect() function, we have this:
while True:
if self.outgoing_messages.qsize() > 0:
message = self.outgoing_messages.get()
self.num_sent_events += 1
self.ws.write_message(message)
message = yield self.ws.read_message()
if message is None:
break
self._on_message(message)
if not self.connected:
break
that means, we read the data, and if we are not fast enough sending a response, the sending is skipped, and we are waiting for a new message. So end result will be, we only send messages, when we get a new message. So sending depends on receiving.
Proposal step 1:
Make reading messages a timeout-ed function, so that if we have no message to read for a certain time, we proceed to send.
Proposal step 2:
Tornado's gen.with_timeout() function is deleting the futures that it waits for after timeout. This is a good practice when we reuse futures, but I don't see that we reuse them, so I would propose to use asyncio.wait_for() function, that is similar to gen.with_timeout(), but it cancels futures in case of timeout. But the usage of asyncio.wait_for() needs python async/await syntax, so to make this, you shall move from @gen.coroutine and yield to async and await.
Proposal step 3:
So here is an example code, that I tested locally with asyncio.wait_for() for making the read_message() not blocking.
async def connect(self):
[...]
while True:
if self.outgoing_messages.qsize() > 0:
message = self.outgoing_messages.get()
self.num_sent_events += 1
self.ws.write_message(message)
try:
# Await for read_message, and return after 2s of timeout
message = await asyncio.wait_for(self.ws.read_message(), timeout=2)
# If no timeout occured and message is not None, parse message
if message is not None:
self._on_message(message)
except:
pass
# Exit loop if socket is closed
if not self.connected:
print("Stopping", self.url)
break
[...]