pynostr icon indicating copy to clipboard operation
pynostr copied to clipboard

Message sending is blocked by receiving

Open milefashell opened this issue 2 years ago • 0 comments

Problem description:

I am trying to write a bot, that receives direct messages, and respond to them. So I am trying to keep a subscription open, and save events continuously. Then in a different thread, I am parsing the messages, and respond to them. Unfortunately in relay.connect() function, we have this:

 while True:
      if self.outgoing_messages.qsize() > 0:
          message = self.outgoing_messages.get()
          self.num_sent_events += 1
          self.ws.write_message(message)
      message = yield self.ws.read_message()
      if message is None:
          break
      self._on_message(message)
      if not self.connected:
          break

that means, we read the data, and if we are not fast enough sending a response, the sending is skipped, and we are waiting for a new message. So end result will be, we only send messages, when we get a new message. So sending depends on receiving.

Proposal step 1:

Make reading messages a timeout-ed function, so that if we have no message to read for a certain time, we proceed to send.

Proposal step 2:

Tornado's gen.with_timeout() function is deleting the futures that it waits for after timeout. This is a good practice when we reuse futures, but I don't see that we reuse them, so I would propose to use asyncio.wait_for() function, that is similar to gen.with_timeout(), but it cancels futures in case of timeout. But the usage of asyncio.wait_for() needs python async/await syntax, so to make this, you shall move from @gen.coroutine and yield to async and await.

Proposal step 3:

So here is an example code, that I tested locally with asyncio.wait_for() for making the read_message() not blocking.

    async def connect(self):
           [...]
            while True:
                if self.outgoing_messages.qsize() > 0:
                    message = self.outgoing_messages.get()
                    self.num_sent_events += 1
                    self.ws.write_message(message)
                try:
                    # Await for read_message, and return after 2s of timeout
                    message = await  asyncio.wait_for(self.ws.read_message(), timeout=2)
                    
                    # If no timeout occured and message is not None, parse message
                    if message is not None:
                        self._on_message(message)
                except:
                    pass
                # Exit loop if socket is closed
                if not self.connected:
                    print("Stopping", self.url)
                    break
          [...]

milefashell avatar Mar 24 '23 06:03 milefashell