rabbit.js icon indicating copy to clipboard operation
rabbit.js copied to clipboard

Cant get resume_name to work

Open henrikjn opened this issue 11 years ago • 3 comments

I'm trying to implement resume_nameso that i don't risk losing messages

I'm getting the error:

Error: Channel closed by server: 403 (ACCESS-REFUSED) with message "ACCESS_REFUSED - queue 'subs.*' in vhost '/' in exclusive use"

Any ideas what I'm doing wrong? Also seems that the pub never submits any message.

The code i run is:

var encoding = "utf8";
var context = require("rabbit.js").createContext(queueSocket); 

context.on("ready", function () {
    console.log(" [x] ready");

    var sub = context.socket('SUB', {resume_name: 'subs.*', persistent: true});
    sub.connect('events');
    sub.setsockopt('persistent', true);
    sub.setEncoding(encoding);

    sub.on('data', function (msg) {
        console.log("(1) MSG --> " + msg);
    }); 
    sub.close();


    var pub = context.socket('PUB');
    pub.connect('events');    
    pub.publish('subs.hello', 'test message', encoding);



    var sub2 = context.socket('SUB', {resume_name: 'subs.*', persistent: true});
    sub2.connect('events');
    sub2.setsockopt('persistent', true);
    sub2.setEncoding(encoding);

    sub2.on('data', function (msg) {
        console.log("(2) MSG --> " + msg);
    });
});

henrikjn avatar Mar 10 '14 19:03 henrikjn

There's some non-determinism working against you here. Specifically, the server hasn't necessarily finished closing the first sub socket by the time you're opening the second. This means two things: it hasn't released its "exclusive" lock on the queue (that which is named by "subs.*"), and it hasn't necessarily stopped delivering messages to the first socket.

With regard to that latter: I think, though I'll have to check, that amqplib stops processing any deliveries once you've closed the channel (quite reasonably, and in fact it's a requirement of the AMQP specification), which is why the 'data' event doesn't fire. It's also the case that amqplib multiplexes "fairly" among channels -- more apparent non-determinism -- which might be how the PUB socket's message can seemingly overtake the SUB socket's close.

You can see it working by introducing a delay -- it can be small delay, but perhaps not as small as setImmediate -- after sub.close(). It's enough time for the server to release the lock on the subscription queue and for the first socket's 'consume' to be torn down.

Oh, and there's another thing at work here, which is a breaking change I've introduced in master (but not in a release, yet): you have to call e.g., sub.subscribe() on a SUB socket to receive messages. Technically it's sub.subscribe(topic) for some topic(s), but a falsey value works fine in the default case.

To be honest I am still working on these features, and actually I'm a bit undecided about them, or at least how they work at present.

squaremo avatar Mar 10 '14 22:03 squaremo

By the way, the SUB socket uses 'exclusive' to avoid (by means of loudly exploding) the situation in which you have two or more live sockets using the same subscription -- which, for admittedly implementation reasons, would lead to each socket getting only some of the messages.

The feature as a whole is more for the scenario in which your whole program exits, exceptionally or otherwise, and you want to pick up where you left off. There is still some message loss possible -- in fact probable, if there's a fairly constant message flow. I could use acks to work around that, but it does make things rather slower (it could be an option I suppose).

squaremo avatar Mar 11 '14 00:03 squaremo

(NB I've taken "resume*" onto its own branch for the time being -- I wasn't very happy with the implementation or semantics, and it might interact poorly with other options. In other words I want to think it over again)

squaremo avatar Mar 28 '14 17:03 squaremo