tornadio2 icon indicating copy to clipboard operation
tornadio2 copied to clipboard

Support custom async response to socket.io events.

Open tabouassaleh opened this issue 12 years ago • 0 comments

Current implementation of socket.io is synchronous in that the raw_message function in session.py wait for the result from the event handler, and send that result as an ACK response to the event message.

This pull request changes that behaviour to optionally allow the user of the library to use celery+redis to dispatch processing, and then send it back to the client at a later time without blocking Tornado during processing.

For instance, using Brukva, the script starting the tornado service could include:

class Command(BaseCommand):
    def handle(self, *args, **options):
        # ...
        router = TornadioRouter(BaseSocket, {
            # ...
        })
        self._server = router.urls[0][2]['server']
        # ...
        c = brukva.Client()
        c.connect()
        c.subscribe('event_result')
        c.subscribe('broadcast_user')
        c.listen(self._event_result_router)
        SocketServer(application, ssl_options = ssl_options)

    def _event_result_router(self, result):
        '''
        Brukva Redis router for dispatching socket.io event results.

        Message body is a list containing:
          `session_id`: The session ID of the socket connection that sent the event.
          `msg_id`: The message sequence number for the event.
          `data`: The result data to be sent back to the client, a list consisting of error and response.
        '''
        err, message = result
        if err:
            logging.error('Event result error: %r', err)
        elif message.channel != 'event_result':
            return
        else:
            session_id, msg_id, error, response = json.loads(message.body)
            session = self._server._sessions._items.get(session_id)

            # The session may have disconnected.
            if not session: return

            # We don't currently use msg_endpoint, so ignore it for now.
            msg_endpoint = None
            if msg_id:
                if msg_id.endswith('+'):
                    msg_id = msg_id[:-1]

            session.send_message(proto.ack(msg_endpoint, msg_id, (error, response)))

And the celery task could look something like this:

@task
def process_event(session_id, message_id, user):
    ''' Simple celery task that returns the user's email address.'''
    c = brukva.Client()
    c.connect()
    result = json.dumps([
        session_id,
        message_id,
        None, # No error.
        user.email
    ])
    c.publish('event_result', result)

tabouassaleh avatar Oct 30 '12 17:10 tabouassaleh