rabbit.js icon indicating copy to clipboard operation
rabbit.js copied to clipboard

REQ/REP Socket - Reply queue utilisation problem

Open AlPatsino opened this issue 11 years ago • 5 comments

Hi guys! I have a weird problem. I'm trying to write simple test for REQ/REP sockets.

Here is client.js:

var rabbit = require('rabbit.js');

var context = rabbit.createContext();

context.on("ready", function() {
    for (var i = 0; i < 10; i++) {
        (function(i) {
            var request = context.socket("REQ");

            request.on("data", function(message) {
                console.log('%s-st Handler received message - %j', i, JSON.parse(message));
                request.close();
            });

            request.connect("my_app.user_profile.authenticate_user", function() {
                var data = { username: 'Alex-' + i};

                console.log('Queued message: %j', data);

                var message = JSON.stringify(data);
                request.write(message, "utf8");
            });

        })(i);
    }
});

Here is server.js:

var rabbit = require('rabbit.js');

var context = rabbit.createContext();

context.on("ready", function() {
    var reply = context.socket("REP");
    reply.connect("my_app.user_profile.authenticate_user", function() {
        reply.on("data", function(inMessage) {
            var inData = JSON.parse(inMessage);
            var outData = {
                success: true,
                username: inData.username
            };

            console.log('Processed message: %j', outData);

            var outMessage = JSON.stringify(outData);
            reply.write(outMessage, "utf8");
        });

    });
});

When I start client.js. I see that 10 messages are queued into my_app.user_profile.authenticate_user durable queue. Then 10 temporal exclusive, autodelete=true queues (for replies) are created. It is nice.

Then I'm starting server.js. I see that server.js processes all 10 queued messages. Then on client.js I see replies from server.js. Very nice. But here is a problem. I see that 10 temporal queues are still alive. As I understand they will disappear with socket (REQ) shutdown. That is why I'm trying to correct client.js code to shutdown REQ socket, after it receives reply message from server.js. I found two methods on Socket: Socket#close() and Socket#end(). I have inserted calls of this methods here:

client.js:

request.on("data", function(message) {
    console.log('%s-st Handler received message - %j', i, JSON.parse(message));
    request.end(); // or request.close();
});

In case of using request.close() or request.end() I'm getting this:

$ node client.test.js
Queued message: {"username":"Alex-0"}
Queued message: {"username":"Alex-3"}
Queued message: {"username":"Alex-1"}
Queued message: {"username":"Alex-4"}
Queued message: {"username":"Alex-2"}
Queued message: {"username":"Alex-5"}
Queued message: {"username":"Alex-9"}
Queued message: {"username":"Alex-8"}
Queued message: {"username":"Alex-7"}
Queued message: {"username":"Alex-6"}
0-st Handler received message - {"success":true,"username":"Alex-0"}

events.js:72
        throw er; // Unhandled 'error' event
              ^
IllegalOperationError: Channel closing
    at Channel.<anonymous> (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:143:11)
    at Channel.C.closeBecause (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:195:8)
    at Channel.C.closeWithError (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:209:8)
    at Channel.C.acceptMessageFrame (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:232:12)
    at Channel.C.accept (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:377:17)
    at Connection.mainAccept [as accept] (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:448:48)
    at Socket.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)

If I wrap up call of request.close() into setTimeout() function like this:

request.on("data", function(message) {
    console.log('%s-st Handler received message - %j', i, JSON.parse(message));
    setTimeout(function() {
        request.close();
    }, 5000);
});

Temporal queues are closed successfully without exceptions. But using setTimeout() seems not very good solution. So where it is correct to call Socket#close(), if I want correctly free resources after I have received message from server? Why does it throw exception if I call it in "on data" event handler?

Is there something I'm doing wrong, or here is some other problem? Could you, please, explain me, what is wrong here?

AlPatsino avatar Jul 31 '14 01:07 AlPatsino

Seems that variant:

request.on("data", function(message) {
    console.log('%s-st Handler received message - %j', i, JSON.parse(message));
    process.nextTick(function() {
        request.close();
    });
});

Is more appropriate. Could you please explain whether this approach is correct?

AlPatsino avatar Jul 31 '14 17:07 AlPatsino

Hi there, thank you for bug report.

It is reasonable to expect your code to work. The reason it's not -- I suspect (I'll investigate further) -- is that the request socket wants to acknowledge the message after the handler has run, and since you're closing it, it tries to do the ack on an invalid channel.

Using nextTick or setImmediate is a good work-around for now. Probably #end and #close ought to do this themselves, to avoid this situation.

squaremo avatar Aug 01 '14 07:08 squaremo

subscribing

+1 for automatically closing the socket

vespakoen avatar Mar 07 '15 03:03 vespakoen

Seems like this is still an issue?

Removed-5an avatar Jun 15 '15 09:06 Removed-5an

Thanks @AlPatsino your solution works well.

minghk avatar Jan 03 '16 08:01 minghk