amqplib
amqplib copied to clipboard
Race condition when acking and closing connection
I'm experiencing a race condition which doesn't allow me to graceful shutdown my service correctly.
My worker pulls from the queue and starts doing long-running jobs.
When kubernetes sends the SIGTERM I'm cancelling the channel to stop other messages going through and wait for the current task to finish.
However when the last task ends and after the ack method is called my server will close the rabbitmq connection and shutdown.
What I'm seeing is that the connection is closed faster than the ack message reaches rabbitmq for that reason the message gets re-queued.
since ack doesn't return a promise or callback I'm unable to wait for the success acknowledgement from the rabbit-server,
can you please advise how I can wait for all the acks to be received by rabbit before shutting down?
channel.consume(queueName, () => {
... do work
channel.ack()
}, {noAck: false }).then((queue) => {
// called as soon as SIGTERM is received
processManager.onTerminating(async () => {
await channel.cancel(queue.consumerTag);
})
});
// called after the last task calls `ack()`
processManager.beforeExit(() => {
// I'd want to wait for all the acks to be received before closing the connection.
return connection.close();
})
Adding a sleep of 2 seconds before closing the connection it actually works and confirm my theory.
processManager.beforeExit(() => {
await sleep(2000);
// I'd want to wait for all the acks to be received before closing the connection.
return connection.close();
})
however I think this is a workaround and there must be a better way of doing this without relying on a timer
Struggling to reproduce. Is processManager an 3rd party library or something custom? Will it wait for connection.close() to resolve?
I've run the following, and every message was acknowledged.
for (( c=1 ; c <= 10000; c++ )) node index.js
const amqp = require('amqplib');
(async () => {
const connection = await amqp.connect();
const channel = await connection.createConfirmChannel();
await channel.assertQueue('test');
const message = await channel.get('test');
if (message) {
throw new Error('previous message was not acknowledged');
}
const { consumerTag } = await channel.consume('test', async (message) => {
if (!message) return;
channel.ack(message);
await channel.cancel(consumerTag);
await connection.close();
process.exit(0);
}, { noAck: false });
channel.sendToQueue('test', Buffer.from('foo'));
})();
Prior to closing the connection, it may also be worth closing the channel you use to acknowledge the message. My script doesn't but I've seen other users commenting that this has forced acknowledgements to be flushed.
TL;DR
Explicitly close all channels before closing the connection
The Long Version
I'm able to reproduce with the following script
const amqp = require('amqplib');
(async () => {
const connection = await amqp.connect();
const channel = await connection.createConfirmChannel();
await channel.assertQueue('test');
for (let i = 0; i < 10; i++) {
channel.sendToQueue('test', Buffer.from('foo'));
}
await channel.waitForConfirms();
let acknowledged = 0;
const { consumerTag } = await channel.consume('test', async (message) => {
if (!message) return;
await doWork(message);
channel.ack(message);
acknowledged++;
if (acknowledged === 10) {
// await channel.close();
await connection.close();
process.exit(0);
}
}, { noAck: false });
async function doWork(message) {
return new Promise(resolve => {
setTimeout(resolve, 1000);
})
}
})();
Here's the wireshark trace
65 17.218520 127.0.0.1 127.0.0.1 AMQP 569 Connection.Start
67 17.221783 127.0.0.1 127.0.0.1 AMQP 378 Connection.Start-Ok
69 17.222576 127.0.0.1 127.0.0.1 AMQP 76 Connection.Tune
71 17.222984 127.0.0.1 127.0.0.1 AMQP 76 Connection.Tune-Ok
73 17.223108 127.0.0.1 127.0.0.1 AMQP 72 Connection.Open vhost=/
75 17.223795 127.0.0.1 127.0.0.1 AMQP 69 Connection.Open-Ok
77 17.227607 127.0.0.1 127.0.0.1 AMQP 69 Channel.Open
79 17.228586 127.0.0.1 127.0.0.1 AMQP 72 Channel.Open-Ok
81 17.229133 127.0.0.1 127.0.0.1 AMQP 69 Confirm.Select
83 17.229794 127.0.0.1 127.0.0.1 AMQP 68 Confirm.Select-Ok
85 17.230318 127.0.0.1 127.0.0.1 AMQP 80 Queue.Declare q=test
87 17.231058 127.0.0.1 127.0.0.1 AMQP 81 Queue.Declare-Ok q=test
89 17.233134 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
91 17.233161 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
93 17.233176 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
95 17.233195 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
97 17.233201 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
99 17.233210 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
101 17.233228 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
103 17.233235 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
105 17.233239 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
107 17.233245 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
109 17.234312 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
111 17.235296 127.0.0.1 127.0.0.1 AMQP 81 Basic.Consume q=test
113 17.236397 127.0.0.1 127.0.0.1 AMQP 1060 Basic.Consume-Ok Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body
117 18.241993 127.0.0.1 127.0.0.1 AMQP 89 Connection.Close reply=Cheers, thanks
119 18.242433 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
121 18.242504 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
123 18.242550 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
125 18.242573 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
127 18.242609 127.0.0.1 127.0.0.1 AMQP 98 Basic.Ack Basic.Ack
129 18.242632 127.0.0.1 127.0.0.1 AMQP 98 Basic.Ack Basic.Ack
131 18.242684 127.0.0.1 127.0.0.1 AMQP 98 Basic.Ack Basic.Ack
133 18.243721 127.0.0.1 127.0.0.1 AMQP 68 Connection.Close-Ok
After running the script, all 10 messages remained on the queue. I suspect this is because the connection close instruction (117) was sent before the messages were acknowledged (119 - 131), hence the broker closed its side of the connection. There is no reply from a Basic.Ack, so no warning is given. Finally the connection close acknowledgment (133) is received from the broker.
If I re-run the test but uncomment the await channel.close() line, all messages were consumed from the queue.
7 0.002565 127.0.0.1 127.0.0.1 AMQP 569 Connection.Start
9 0.005111 127.0.0.1 127.0.0.1 AMQP 378 Connection.Start-Ok
11 0.006047 127.0.0.1 127.0.0.1 AMQP 76 Connection.Tune
13 0.006412 127.0.0.1 127.0.0.1 AMQP 76 Connection.Tune-Ok
15 0.006534 127.0.0.1 127.0.0.1 AMQP 72 Connection.Open vhost=/
17 0.007320 127.0.0.1 127.0.0.1 AMQP 69 Connection.Open-Ok
19 0.010707 127.0.0.1 127.0.0.1 AMQP 69 Channel.Open
21 0.011820 127.0.0.1 127.0.0.1 AMQP 72 Channel.Open-Ok
23 0.012325 127.0.0.1 127.0.0.1 AMQP 69 Confirm.Select
25 0.012986 127.0.0.1 127.0.0.1 AMQP 68 Confirm.Select-Ok
27 0.013508 127.0.0.1 127.0.0.1 AMQP 80 Queue.Declare q=test
29 0.014161 127.0.0.1 127.0.0.1 AMQP 81 Queue.Declare-Ok q=test
31 0.016229 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
33 0.016256 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
35 0.016272 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
37 0.016279 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
39 0.016300 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
41 0.016342 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
43 0.016366 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
45 0.016385 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
47 0.016393 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
49 0.016422 127.0.0.1 127.0.0.1 AMQP 114 Basic.Publish x= rk=test Content-Header Content-Body
51 0.017558 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
53 0.018452 127.0.0.1 127.0.0.1 AMQP 81 Basic.Consume q=test
55 0.019444 127.0.0.1 127.0.0.1 AMQP 1060 Basic.Consume-Ok Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body
65 1.026862 127.0.0.1 127.0.0.1 AMQP 77 Basic.Ack
67 1.026986 127.0.0.1 127.0.0.1 AMQP 98 Basic.Ack Basic.Ack
69 1.027041 127.0.0.1 127.0.0.1 AMQP 119 Basic.Ack Basic.Ack Basic.Ack
71 1.027090 127.0.0.1 127.0.0.1 AMQP 166 Basic.Ack Basic.Ack Basic.Ack Basic.Ack Channel.Close reply=Goodbye
73 1.028741 127.0.0.1 127.0.0.1 AMQP 68 Channel.Close-Ok
75 1.034298 127.0.0.1 127.0.0.1 AMQP 89 Connection.Close reply=Cheers, thanks
77 1.035784 127.0.0.1 127.0.0.1 AMQP 68 Connection.Close-Ok
The above wireshark trace shows that this time, the Channel.Close (last frame on 71) instruction was queued behind all the message acknowledgements (65-71). Because the script awaits for the channel close instruction to be acknowledged (73), the connection close instruction is not sent until the very end.
I think the underlying reason for all this is that when you call connection.close, amqplib writes the ConnectionClose operation to the specially reserved channel 0. Other operations such as acknowledging messages (Basic.Ack) are sent using channels created by the client.
Under the hood, the amqplib muxer decides which channel to read from before forwarding the packets to the broker. Roughly speaking it round robins between channels in in order of creation, oldest first, with the caveat that when a channel is first created it will get priority.
This means that channel 0 operations such as Connection.Close, will often be sent to the broker before those written to other channels. The solution is to ensure you explicitly close all channels (remembering to await), before closing the connection. The Channel.Close operation is sent using the same channel as for message acknowledgments, and so will be queued behind whatever has previously been buffered. Because the broker must reply to Channel.Close, the channel.close() function can be awaited. Doing so means that you can be sure that all pending channel operations prior to closing the channel have completed.
The remaining question is whether amqplib should formally close channels (i.e. send a ChannelClose to the broker) when the parent connection is closed.
I took a look at this over the weekend. I looks like it's possible to close the channels (and wait for the replies) before sending the ConnectionClose command, here. Something like...
C.closeBecause = function(reason, code, k) {
const closeChannels = this.channels.map(function(ch, i) {
if (i === 0) return Promise.resolve();
// We're about to close the connection, so if a channel errors while closing there's not much we can doo
return ch.channel.close().catch(() => {});
})
Promise.all(closeChannels).then(() => {
this.sendMethod(0, defs.ConnectionClose, {
replyText: reason,
replyCode: code,
methodId: 0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
})
};
It would still leave the library open to a narrower race condition where the user created a channel around the same time as calling connection.close(). We can minimise this further by adding a "isClosing" check to the connection and rejecting / yielding an error from the createChannel and createConfirmChannel methods, but I suspect there may still be a small gap.
It may also take a significant amount of time for applications with a high number of channels, which could lead to slower application shutdowns, resulting in whatever is managing the shutdown sending a kill. Not overly worried about this, as the alternative is to lose messages.
Any news on this @cressie176 ? I'm having the same problem, in my case after an ack i need to close the connection and kill the process. I would need to have an way to make sure the ack is already done before closing the connection
Hi @rubenmorim, Make sure you close all channels before closing the connection and you should be fine.
Going to close this. I'm not confident about catching all the race conditions. It's documented you need to close channels before the connection.