gevent-socketio icon indicating copy to clipboard operation
gevent-socketio copied to clipboard

Difference between node.js

Open yunmanger1 opened this issue 12 years ago • 4 comments

Hi!

I have implementation of the same app using gevent-socketio and node.js

I am trying to test my apps via JMeter. Refering to socket.io-spec I created a test plan, where I:

  1. connect to /socket.io/1/, fetch session from response 097468867931:15:10:websocket,xhr-multipart,htmlfile,jsonp-polling,flashsocket,xhr-polling
  2. POST /socket.io/1/xhr-polling/097468867931?t=1343736013258 5:::{"name":"Auth","args":[{"userId":1,"key":"key"}]}

The problem is that this plan works with node.js, but does not work with gevent-socketio.

No matter what type of event I send I get 1:: in response and event is not triggered on server side. What's even more amazing is that sometimes it works and I get expected 1 result.

The same testplan works with node.js implementation. Both apps operate correctly in browser.

What might be the problem? At first I suspected that session is not created completely at the time of second request. I tried to make a delay of 3 seconds between requests, but this did not help.

Thank you!

yunmanger1 avatar Aug 01 '12 04:08 yunmanger1

Could you provide a small sample app that shows the problem so I can fix it?

sontek avatar Aug 01 '12 04:08 sontek

Yes. Actually it's based on pyramid_backbone_redis_chat example:

import redis
import ujson as json
import urllib2
from socketio.namespace import BaseNamespace
from socketio.mixins import RoomsMixin, BroadcastMixin
from socketio import socketio_manage
from settings import settings as ns
import gevent
import os
from pyramid.response import FileResponse
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(msg)s")
log = logging.getLogger()


def index(request):
    """ Base view to load our template """
    return {}


pool = redis.ConnectionPool(\
    host=ns.REDIS_HOST,
    port=ns.REDIS_PORT,
    db=ns.REDIS_DB, socket_timeout=None)


def login_required(f):
    def new_f(self, *args, **kwargs):
        if not 'user_id' in self.session:
            log.debug('unathorized')
            return
        return f(self, *args, **kwargs)
    return new_f


def community_required(f):
    def new_f(self, data, *args, **kwargs):
        community_id = data.get('communityId', None)
        if not community_id:
            log.debug('community is not set')
            return
        kwargs['community_id'] = int(community_id)
        return f(self, data, *args, **kwargs)
    return new_f


class ChatNamespace(BaseNamespace, RoomsMixin, BroadcastMixin):

    def listener(self):
        log.debug('Starting Listener')
        try:
            while 'pubsub' in self.session:
                try:
                    r = self.session['pubsub']

                    for m in r.listen():
                        if m['type'] == 'message':
                            if ns.DEBUG:
                                log.debug('Got message:')
                                log.debug(m)
                            msg = json.loads(m['data'])
                            self.emit("notification", msg)
                            gevent.sleep()
                except AttributeError:
                    # there is some bug in py-redis
                    # NoneType has no attribute readline
                    pass
                gevent.sleep()
        except Exception, e:
            log.exception(e)
        finally:
            del self.session['pubsub']
            log.debug('Exiting Listener')

    def join(self, room):
        if not room in self.session['rooms']:
            log.debug('Join room: {}'.format(room))
            if 'pubsub' in self.session:
                self.session['pubsub'].subscribe(room)
            else:
                log.error('We are not listenning: subscribe')
        super(ChatNamespace, self).join(room)

    def leave(self, room):
        if not room in self.session['rooms']:
            log.debug('Leave room: {}'.format(room))
            if 'pubsub' in self.session:
                self.session['pubsub'].unsubscribe(room)
            else:
                log.error('We are not listenning: unsubscribe')
        super(ChatNamespace, self).leave(room)

    @community_required
    @login_required
    def on_enterChat(self, data, community_id=None):
        self.join(str(community_id))
        self.spawn(self.get_last_messages, community_id)

    @community_required
    @login_required
    def on_exitChat(self, data, community_id=None):
        self.leave(str(community_id))

    @community_required
    @login_required
    def on_addChatMessage(self, data, community_id=None):
        message = data.get('message', None)
        log.debug(message)
        if not message:
            log.debug('message is not set')
            return
        user_id = self.session['user_id']
        self.spawn(self.send_message, community_id, user_id, message)

    def on_disconnect(self, data):
        del self.session['rooms']

    def send_message(self, community_id, user_id, message):
        r = redis.StrictRedis(connection_pool=pool)
        msg = json.dumps({
            'userId': user_id,
            'message': message,
            'type': 'chat'})
        key = 'c{}'.format(community_id)
        r.pipeline().rpush(key, msg)\
            .ltrim(key, -ns.CHAT_MAX_HISTORY_LENGTH, -1)\
            .publish(str(community_id), msg).execute()

    def get_last_messages(self, community_id):
        r = redis.StrictRedis(connection_pool=pool)
        result = map(json.loads, r.lrange('c{}'.format(community_id),
            -ns.CHAT_MAX_HISTORY_LENGTH,
            ns.CHAT_MAX_HISTORY_LENGTH))
        self.emit('chatMessages', result)

    def recv_connect(self):
        super(ChatNamespace, self).recv_connect()
        log.debug('Recieve connect')

    def recv_error(self, *a, **kw):
        super(ChatNamespace, self).recv_error(*a, **kw)
        log.debug('error %s %s' % (str(a), str(kw)))

    def on_connect(self, *a, **kw):
        log.debug('connect %s %s' % (str(a), str(kw)))

    def on_setUserId(self, data):
        user_id = data.get('userId', None)
        key = data.get('key', None)
        status = True
        try:
            url = ns.AUTH_BY_KEY.format(
                    host=ns.CORE_HOST,
                    user_id=user_id,
                    key=key)
            log.debug('REQUEST: {}'.format(url))
            data = json.loads(urllib2.urlopen(url).read())
            status = data.get('status', None) == 'ok'
        except Exception, e:
            log.exception(e)
            status = False

        if status:
            self.session['user_id'] = user_id
            log.debug('user_{} authenticated'.format(user_id))
            r = redis.StrictRedis()
            r = r.pubsub()
            self.session['pubsub'] = r
            self.spawn(self.listener)
            self.join('user_{}'.format(user_id))
        else:
            log.debug('Wrong key for user {}'.format(user_id))
            self.emit('disconnect', {'message': 'wrong key'})


def socketio_service(request):
    if not 'socketio' in request.environ:
        return {}
    socketio_manage(request.environ,
        {
            '': ChatNamespace,
        }, request=request
    )
    return {}


def socketiojs_view(request):
    here = os.path.dirname(__file__)
    path = os.path.join(here, 'static', 'socket.io.min.js')
    return FileResponse(path, request=request)

yunmanger1 avatar Aug 01 '12 05:08 yunmanger1

Hello,

I think that the client-side initiated transport selection is gone in 0.7+ (0.8+ or 0.9+ ?). It is now set on the server side, so you probably are tapping in a backwards compatibility feature of the node.js version. It is not necessary to support it if you use the latest socket.io.js client library, and that's why we revamped gevent-socketio. Otherwise, the previously gevent-socketio should work with 0.6.

Does that shed any light ?

btw, it's cool that you benchmark the two implementations,.. I'd be interested in the results :)

thanks!

Alexandre

On Wed, Aug 1, 2012 at 12:30 AM, yunmanger1 < [email protected]

wrote:

Hi!

I have implementation of the same app using gevent-socketio and node.js

I am trying to test my apps via JMeter. Refering to socket.io-spec I created a test plan, where I:

  1. connect to /socket.io/1/, fetch session from response

097468867931:15:10:websocket,xhr-multipart,htmlfile,jsonp-polling,flashsocket,xhr-polling

  1. POST /socket.io/1/xhr-polling/097468867931?t=1343736013258 5:::{"name":"Auth","args":[{"userId":1,"key":"key"}]}

The problem is that this plan works with node.js, but does not work with gevent-socketio.

No matter what type of event I send I get 1:: in response and event is not triggered on server side. What's even more amazing is that sometimes it works and I get expected 1 result.

The same testplan works with node.js implementation. Both apps operate correctly in browser.

What might be the problem? At first I suspected that session is not created completely at the time of second request. I tried to make a delay of 3 seconds between requests, but this did not help.

Thank you!


Reply to this email directly or view it on GitHub: https://github.com/abourget/gevent-socketio/issues/69

abourget avatar Aug 01 '12 12:08 abourget

I think I got the problem. I was running my app under gunicorn using GeventSocketIOWorker with 4 workers. Workers do not share "socket-sessions" between them. The first and the second request fall on different workers.

Is there any way to run multiple workers?

I think that the client-side initiated transport selection is gone in 0.7+

https://github.com/abourget/gevent-socketio/blob/master/socketio/handler.py#L104 From this line I understood that client can communicate via any transport it sets in url.

yunmanger1 avatar Aug 02 '12 09:08 yunmanger1