REQ/REP Socket - Reply queue utilisation problem
Hi guys! I have a weird problem. I'm trying to write simple test for REQ/REP sockets.
Here is client.js:
var rabbit = require('rabbit.js');
var context = rabbit.createContext();
context.on("ready", function() {
for (var i = 0; i < 10; i++) {
(function(i) {
var request = context.socket("REQ");
request.on("data", function(message) {
console.log('%s-st Handler received message - %j', i, JSON.parse(message));
request.close();
});
request.connect("my_app.user_profile.authenticate_user", function() {
var data = { username: 'Alex-' + i};
console.log('Queued message: %j', data);
var message = JSON.stringify(data);
request.write(message, "utf8");
});
})(i);
}
});
Here is server.js:
var rabbit = require('rabbit.js');
var context = rabbit.createContext();
context.on("ready", function() {
var reply = context.socket("REP");
reply.connect("my_app.user_profile.authenticate_user", function() {
reply.on("data", function(inMessage) {
var inData = JSON.parse(inMessage);
var outData = {
success: true,
username: inData.username
};
console.log('Processed message: %j', outData);
var outMessage = JSON.stringify(outData);
reply.write(outMessage, "utf8");
});
});
});
When I start client.js. I see that 10 messages are queued into my_app.user_profile.authenticate_user durable queue. Then 10 temporal exclusive, autodelete=true queues (for replies) are created. It is nice.
Then I'm starting server.js. I see that server.js processes all 10 queued messages. Then on client.js I see replies from server.js. Very nice. But here is a problem. I see that 10 temporal queues are still alive. As I understand they will disappear with socket (REQ) shutdown. That is why I'm trying to correct client.js code to shutdown REQ socket, after it receives reply message from server.js. I found two methods on Socket: Socket#close() and Socket#end(). I have inserted calls of this methods here:
client.js:
request.on("data", function(message) {
console.log('%s-st Handler received message - %j', i, JSON.parse(message));
request.end(); // or request.close();
});
In case of using request.close() or request.end() I'm getting this:
$ node client.test.js
Queued message: {"username":"Alex-0"}
Queued message: {"username":"Alex-3"}
Queued message: {"username":"Alex-1"}
Queued message: {"username":"Alex-4"}
Queued message: {"username":"Alex-2"}
Queued message: {"username":"Alex-5"}
Queued message: {"username":"Alex-9"}
Queued message: {"username":"Alex-8"}
Queued message: {"username":"Alex-7"}
Queued message: {"username":"Alex-6"}
0-st Handler received message - {"success":true,"username":"Alex-0"}
events.js:72
throw er; // Unhandled 'error' event
^
IllegalOperationError: Channel closing
at Channel.<anonymous> (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:143:11)
at Channel.C.closeBecause (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:195:8)
at Channel.C.closeWithError (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:209:8)
at Channel.C.acceptMessageFrame (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:232:12)
at Channel.C.accept (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:377:17)
at Connection.mainAccept [as accept] (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:63:33)
at Socket.go (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:448:48)
at Socket.EventEmitter.emit (events.js:92:17)
at emitReadable_ (_stream_readable.js:408:10)
If I wrap up call of request.close() into setTimeout() function like this:
request.on("data", function(message) {
console.log('%s-st Handler received message - %j', i, JSON.parse(message));
setTimeout(function() {
request.close();
}, 5000);
});
Temporal queues are closed successfully without exceptions. But using setTimeout() seems not very good solution. So where it is correct to call Socket#close(), if I want correctly free resources after I have received message from server? Why does it throw exception if I call it in "on data" event handler?
Is there something I'm doing wrong, or here is some other problem? Could you, please, explain me, what is wrong here?
Seems that variant:
request.on("data", function(message) {
console.log('%s-st Handler received message - %j', i, JSON.parse(message));
process.nextTick(function() {
request.close();
});
});
Is more appropriate. Could you please explain whether this approach is correct?
Hi there, thank you for bug report.
It is reasonable to expect your code to work. The reason it's not -- I suspect (I'll investigate further) -- is that the request socket wants to acknowledge the message after the handler has run, and since you're closing it, it tries to do the ack on an invalid channel.
Using nextTick or setImmediate is a good work-around for now. Probably #end and #close ought to do this themselves, to avoid this situation.
subscribing
+1 for automatically closing the socket
Seems like this is still an issue?
Thanks @AlPatsino your solution works well.