amqplib icon indicating copy to clipboard operation
amqplib copied to clipboard

Support Auto-reconnection

Open skeggse opened this issue 11 years ago • 40 comments

At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only throw and there's no reconnect support.

Even in a data center, connections can fail or time out, and reconnection would allow the connection to fail and recover from the failure, instead of simply giving up.

This might need to happen at the connect.js level given the current setup, but that would put messages that have been sent but not acked into an uncertain place where it's unclear if the message has reached the server or just fallen. I know from experience that trying to get reconnection to work well can be tricky. For a counterexample, see node-amqp's reconnection, which appears to leak everywhere.

skeggse avatar Oct 02 '13 21:10 skeggse

Recovery of AMQP 0-9-1 connections involves more than simply reconnecting. You may want to get familiar with what other clients do. Fortunately, there are good examples of how consumer recovery can work with no effort on the developer end.

It's a lot more involved for producers as socket issues may be detected in several seconds. As such, not only publishers that cannot afford to lose a message need to use publisher confirms, they also need to keep a write-ahead log of messages that can be replayed if the publisher fails before outstanding confirms arrive.

michaelklishin avatar Oct 02 '13 21:10 michaelklishin

For example, Hutch adds a disk-based write-ahead log on top of Bunny among other things.

michaelklishin avatar Oct 02 '13 21:10 michaelklishin

Would it not make sense to roll that into the parent library? Maybe not disk-based, but a write-ahead log?

skeggse avatar Oct 02 '13 21:10 skeggse

At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only throw and there's no reconnect support.

When the connection drops you are pretty much hosed, and usually the best thing to do is start from scratch -- assert all your exchanges and queues again and go from there. There are some things the client can do for you (like Michael's Ruby client); but I am rather wary of second-guessing the application code. You might want such-and-such a queue to disappear with the connection.

Regarding messages, yes you can use confirmations (now with callbacks!), which can change your guarantee from "at most once" (a message might not make it, and you won't know) to "at least once" (you might not get a confirmation, and so resend a message).

(Aside: Do you guys get github emails from the future or something? You always have about three posts back and forth before I manage to comment ..)

squaremo avatar Oct 02 '13 21:10 squaremo

RAM-based write-ahead log is not an option for people who cannot afford to lose a single message (such as a couple of payment processors that are known to use Bunny). Others often simply don't need a WAL.

In any case, I'm not at all convinced it should be part of client libraries. amqplib should try to provide about as much as Ruby clients do. Those provide over 2 years of automatic connection recovery experience to learn from.

michaelklishin avatar Oct 02 '13 21:10 michaelklishin

Would it not make sense to roll that into the parent library? Maybe not disk-based, but a write-ahead log?

It'd make an interesting example, at least -- put a buffer in front of writes, and recover from that in the case of connection drops.

The usual example for guaranteed delivery is moving rows from a database (or files from a filesystem), so the "reliable storage" bit is implicit in the scenario.

squaremo avatar Oct 02 '13 21:10 squaremo

When the connection drops you are pretty much hosed, and usually the best thing to do is start from scratch -- assert all your exchanges and queues again and go from there.

In every case I can think of, the starting from scratch can be taken care of by the library, and if you want a transient queue or exchange you define it as such with autoDelete and let the library or RabbitMQ take care of arbitrary naming. Then, on reconnect, the library or RabbitMQ just create a new queue/exchange in the same manner. If you bound it to something specific, it would ensure the bind to the specific entity, and if it were to another transient entity it would recreate that bind.

Those steps, however, would quickly become convoluted and difficult to get right in an abstract enough manner to suit all users.

RAM-based write-ahead log is not an option for people who cannot afford to lose a single message (such as a couple of payment processors that are known to use Bunny). Others often simply don't need a WAL.

That's a good point. For us, we need nearly as much reliability, so RAM-based might not make sense (but what happens if the hard drive fails and causes the system to go down? Then you're really hosed.

That said, I feel like it might be beneficial for those who don't need quite as much reliability.

It'd make an interesting example, at least -- put a buffer in front of writes, and recover from that in the case of connection drops.

That's more or less the extended version of what I was attempting to communicate in our other discussion. Instead of just keeping a buffer of unsent commands, keep a buffer of unacknowledged commands. This would allow one to publish transparently through the library and assume that it will reach its destination--provided the producer doesn't crash.

skeggse avatar Oct 02 '13 21:10 skeggse

I don't know the right answer. Auto-reconnection is a feature many database libraries support, though this isn't a database library. Regardless, in my ideal world the end-developer wouldn't need to worry about reliability as much. Reconnections are common enough that it seems a useful feature.

(Aside: Do you guys get github emails from the future or something? You always have about three posts back and forth before I manage to comment ..)

Yup, on every single one. Make sure you're watching the thread--it's a button at the bottom of issue pages.

skeggse avatar Oct 02 '13 21:10 skeggse

Nobody is saying that reconnection should not be supported, just that it is not as trivial as recovering TCP connection.

michaelklishin avatar Oct 02 '13 22:10 michaelklishin

Oh, absolutely. Who would you rather be in charge of recovery, the library or every developer using the library? Better to get it right once.

skeggse avatar Oct 02 '13 22:10 skeggse

Who would you rather be in charge of recovery, the library or every developer using the library?

If there was a definitive way to recover, the library; without that, every developer.

squaremo avatar Oct 02 '13 22:10 squaremo

Exactly.

skeggse avatar Oct 02 '13 23:10 skeggse

I'll provide my point of view as a developer who is considering switching from node-amqp to amqp.node: The lack of a reconnection feature is a barrier to me making the switch. If it was just a matter of changing the library, it would be an easy decision-- but currently it also means I need to design and implement all the reconnection logic too-- and that's enough to dissuade me from switching right now. I suspect there are other node-amqp users like me.

Ideally, node.amqp would support a common-sense reconnection strategy that works for 80% of its users, and provide a way to turn it off for the other 20% who want to implement it in their own applications. That's what I'd love to see... Of course, it's easy to say that when I'm not the one who has to code it up. ;-)

cmoesel avatar Oct 03 '13 12:10 cmoesel

@cmoesel So the reconnection in node-amqp works for you?

squaremo avatar Oct 03 '13 12:10 squaremo

@squaremo It works well enough. I think there are leaks in there, but so far it has not negatively impacted us.

cmoesel avatar Oct 03 '13 12:10 cmoesel

Ideally, node.amqp would support a common-sense reconnection strategy that works for 80% of its users, and provide a way to turn it off for the other 20% who want to implement it in their own applications.

This. You could take it a step further and provide a reconnect hook for say an 18% who have a specific reconnect strategy that fits into the reconnect envelope. Basically, on reconnect at a socket level, by default try to reconstruct the old state. If the user would prefer, give them a means to easily tap into the same information used in the default reconnection mechanism and set up the new connection. For the last, say, 2%, who need something very specialized, just let them disable it. Not strictly necessary, but providing a common structure for reconnection could help with maintaining the reconnection aspect of the project.

That was a little convoluted...let me know if I can clarify something.

skeggse avatar Oct 03 '13 20:10 skeggse

Basically, on reconnect at a socket level, by default try to reconstruct the old state.

This is a bad idea. It assumes that the desired state is always the same. What if, for example, queues are constructed according to received data?

I would use something like a fixture; that is, a procedure that creates a known base state at each connection. You could do, for example,

function fixture(conn) {
  return conn.createChannel().then(function (ch) {
    conn.on('error', reconnectAtSomePoint);
    ch.assertQueue(TASKS);
    ch.assertExchange(PUBSUB);
    ch.close();
    return conn;
  });
}

function reconnect() {
  amqp.connect(URL).then(fixture).then(main);
}

function main(conn) {
   // set up consumers etc.
}

There's nothing extra needed in amqplib in order to accomplish that yourself. Helpers for e.g., reconnection strategies (reconnectAtSomePoint) would make a nice addition at some point, I agree.

squaremo avatar Oct 04 '13 11:10 squaremo

Bunny tries to reconstruct the state it knows about with a couple of exceptions:

  • Server-named queues always get a new name from the server
  • Channel ids are not guaranteed to be the same (nobody should rely on them anyway)
  • Predefined entities (e.g. amq.fanout) are not redeclared

This works very well in practice but also relies on the fact that exchanges in queues in Bunny are objects, so we can track what the client itself has declared.

michaelklishin avatar Oct 04 '13 11:10 michaelklishin

This is a bad idea. It assumes that the desired state is always the same. What if, for example, queues are constructed according to received data?

In our case, they are constructed according to received data. And once I've created them, I expect them to stay the same. It looks like this is related to the strategy node-amqp uses, except that it doesn't care about what's on the channels. All it does for reconnection is to reconnection the underlying transport socket, then reconnect each channel. Channel "reconnection" is basically just connection: it uses the same strategy as it starts with.

Even this simple approach seems like it might work in many cases. You'd lose autoDelete entities, but you'd keep persistent ones.

I would use something like a fixture; that is, a procedure that creates a known base state at each connection.

That was sorta my idea for the 18% case, except that the fixture would know what the previous configuration was, if any. Not strictly required, and not necessarily useful, but could hint at some things.

Server-named queues always get a new name from the server

That's what node-amqp lacks, and it's a pretty big loss. On reconnect, your code might not continue to operate.

Channel ids are not guaranteed to be the same (nobody should rely on them anyway)

Agreed, but if you just keep your Channel objects, why not use the channel ids? They already exist, just keep them! If the last state was valid--no overlapping channel ids, the new state will be too, right?

skeggse avatar Oct 04 '13 16:10 skeggse

Agreed, but if you just keep your Channel objects, why not use the channel ids? They already exist, just keep them! If the last state was valid--no overlapping channel ids, the new state will be too, right?

Yes and that's how it works most of the time, but if you open and close channels all the time (not necessary but some do that to get very fine-grained error handling), at the time of recovery there may be concurrent channel id allocations. For similar cases, there are ways to "recover" a channel manually: keep the object but allocate a new number and open it. So they are not guaranteed to stay the same between connection recoveries, even though almost always they do.

michaelklishin avatar Oct 04 '13 17:10 michaelklishin

Since your talking reconnections... Thoughts on supporting multiple hosts?

gjohnson avatar May 08 '14 08:05 gjohnson

I implemented a really simple reconnection strategy in my app: https://gist.github.com/benmoss/e93125d1fb3561be9276 (excuse the CoffeeScript :smile:)

For my use case just dropping messages is ok until reconnection is possible, though it's not hard to imagine how you could implement a buffer.

benmoss avatar May 08 '14 22:05 benmoss

I implemented a really simple reconnection strategy in my app: https://gist.github.com/benmoss/e93125d1fb3561be9276 (excuse the CoffeeScript :smile:)

It reads quite nicely in CoffeeScript, I reckon.

For my use case just dropping messages is ok until reconnection is possible, though it's not hard to imagine how you could implement a buffer.

Exactly, if dropped messages did matter, you could extend it to use confirmations and a replay buffer. Although, more moving parts = more failure modes.

squaremo avatar May 09 '14 14:05 squaremo

Since your talking reconnections... Thoughts on supporting multiple hosts?

That would certainly go hand-in-hand with reconnection. A typical scheme is to supply a collection of connection points (URLs, it would be here) for a cluster and try each in turn.

By the way, this is the kind of thing I worry about with automatic reconnection: http://status.cloudamqp.com/incidents/56bhzt813hg9 (more info: https://twitter.com/CloudAMQP/status/455806520370798592; underlying problem: https://github.com/postwait/node-amqp/issues/262). This particular issue was pretty straight-forward -- trying to redeclare queues with server-generated names will always fail -- but AMQP is full of corners like this, and that's not taking into account the idiosyncrasies of a given application. Making assumptions about the reconnection properties of queues is dangerous.

That's not to say it can't be figured out. The RabbitMQ team certainly seem to think it's possible, since they (well, @michaelklishin for the most part, as I understand it) added reconnection to the Java client, and that is based, I would think, on how Bunny does it -- which presumably has worked very well in practice.

I'm not comfortable with all the things the RabbitMQ Java client does for its recovery -- rebasing delivery tags, for example -- which come rather close to second-guessing the server behaviour (or, to put it another way, are tightly coupled with the server).

I would prefer to require applications to be deliberate about recovery, help them where it is possible to do so without making additional assumptions, and to surface failures where it is not.

squaremo avatar May 09 '14 14:05 squaremo

I am not familiar with CoffeeScript, so I try to convert the https://gist.github.com/benmoss/e93125d1fb3561be9276 to JavaScript.

Here it is https://gist.github.com/richzw/57177f3fecbeb921819c

@squaremo and @benmoss, Could you please help me review it? Thanks in advance.

I want to know what does the createFakeChannel_ function mean?

richzw avatar Feb 09 '15 12:02 richzw

@richzw, coffeescript compiles into javascript, so if you need javascript, you don't need to manually convert it-- you can just take the result of compiling the coffeescript. Here is the result of the coffeescript you referenced above: http://pastebin.com/REPR1zP2

cmoesel avatar Feb 09 '15 14:02 cmoesel

@richzw the createFakeChannel function was there so that if publish is called while the server is disconnected it will be a no-op, rather than throwing an exception. It allows the other parts of the program to work while just dropping messages instead of sending them to RabbitMQ. This may not be acceptable for all programs, but it worked well enough for mine.

benmoss avatar Feb 09 '15 18:02 benmoss

@cmoesel and @benmoss thanks for your help.

@benmoss, I want to publish message to amqp if the channel is ok, otherwise, continue to reconnect to amqp until the connection is ok. so

when(amqp.connect( this.addr_ ))
    .with( this )
    .then( this.createChannel_ )
    .then( this.createExchange_ )
    .then( this.handleUnrouteableMessages_ )
    .then( this.publish_, this.handleDisconnections_ )
    .catch( this.reconnect_ )

It seems works well.

Now I want to establish one permanent connection with amqp when my apps starts, then call publish function once there are messages to be sent. Is it doable? or Is that possible to do that with your codes?

richzw avatar Feb 10 '15 02:02 richzw

@richzw yeah, just create an instance of the MessageBus in your app startup and use it as a global object. i've seen this done by having a globals module that has all the global objects. wherever you need to publish from the rest of your app:

// globals.js
module.exports = {
  messageBus: new MessageBus()
};
// somewhere_else.js
var messageBus = require("globals").messageBus;

messageBus.publish("order.created", {orderNumber: 555});

benmoss avatar Feb 10 '15 16:02 benmoss

hi @benmoss and @squaremo

In this gist https://gist.github.com/richzw/6b4d348c6b8abdc8176e, I want to Sender class to handle connection failure recover through re-connection. Here are the test codes.

var Sender = require('./sender.js');
var sender = new Sender( amqpAddress, 3000);

function sendMessage( key, msg ) {
    var byteBuffer = msg.encode().toBuffer();
    sender.deliverMessage( key, byteBuffer );
}

// send message every 5 seconds
var CronJob = require('cron').CronJob;
var send_message_job = new CronJob({
    cronTime: '*/5 * * * * *',
    onTick: function(){
        sendMessage( key, message );
    },
    start: true,
});

send_message_job.start();

Then I try to stop and start amqp service randomly. And the failure recover can be well handled sometime, but it can failed with errors

Error: read ECONNRESET at errnoException (net.js:905:11) at TCP.onread (net.js:559:19)

Can anyone help me figure out how to handle it?

richzw avatar Feb 11 '15 06:02 richzw