amqplib icon indicating copy to clipboard operation
amqplib copied to clipboard

Race condition when acking and closing connection

Open fenos opened this issue 3 years ago • 7 comments

I'm experiencing a race condition which doesn't allow me to graceful shutdown my service correctly.

My worker pulls from the queue and starts doing long-running jobs. When kubernetes sends the SIGTERM I'm cancelling the channel to stop other messages going through and wait for the current task to finish.

However when the last task ends and after the ack method is called my server will close the rabbitmq connection and shutdown.

What I'm seeing is that the connection is closed faster than the ack message reaches rabbitmq for that reason the message gets re-queued.

since ack doesn't return a promise or callback I'm unable to wait for the success acknowledgement from the rabbit-server,

can you please advise how I can wait for all the acks to be received by rabbit before shutting down?


channel.consume(queueName, () => {
    ... do work
   channel.ack()
}, {noAck: false }).then((queue) => {
  
   // called as soon as SIGTERM is received
   processManager.onTerminating(async () => {
      await channel.cancel(queue.consumerTag);
   })
});

// called after the last task calls `ack()`
processManager.beforeExit(() => {
   // I'd want to wait for all the acks to be received before closing the connection.
   return connection.close();
})

fenos avatar Jul 14 '21 09:07 fenos

Adding a sleep of 2 seconds before closing the connection it actually works and confirm my theory.

processManager.beforeExit(() => {
   await sleep(2000);
   // I'd want to wait for all the acks to be received before closing the connection.
   return connection.close();
})

however I think this is a workaround and there must be a better way of doing this without relying on a timer

fenos avatar Jul 15 '21 11:07 fenos

Struggling to reproduce. Is processManager an 3rd party library or something custom? Will it wait for connection.close() to resolve?

cressie176 avatar May 05 '22 09:05 cressie176

I've run the following, and every message was acknowledged.

for (( c=1 ; c <= 10000; c++ )) node index.js
const amqp = require('amqplib');

(async () => {
  const connection = await amqp.connect();
  const channel = await connection.createConfirmChannel();
  await channel.assertQueue('test');

  const message = await channel.get('test');
  if (message) {
    throw new Error('previous message was not acknowledged');
  }

  const { consumerTag } = await channel.consume('test', async (message) => {
    if (!message) return;
    channel.ack(message);
    await channel.cancel(consumerTag);
    await connection.close();
    process.exit(0);
  }, { noAck: false });

  channel.sendToQueue('test', Buffer.from('foo'));
})();

cressie176 avatar May 06 '22 16:05 cressie176

Prior to closing the connection, it may also be worth closing the channel you use to acknowledge the message. My script doesn't but I've seen other users commenting that this has forced acknowledgements to be flushed.

cressie176 avatar May 06 '22 17:05 cressie176

TL;DR

Explicitly close all channels before closing the connection

The Long Version

I'm able to reproduce with the following script

const amqp = require('amqplib');

(async () => {
  const connection = await amqp.connect();
  const channel = await connection.createConfirmChannel();
  await channel.assertQueue('test');

  for (let i = 0; i < 10; i++) {
    channel.sendToQueue('test', Buffer.from('foo'));
  }
  await channel.waitForConfirms();

  let acknowledged = 0;
  const { consumerTag } = await channel.consume('test', async (message) => {
    if (!message) return;

    await doWork(message);

    channel.ack(message);
    acknowledged++;

    if (acknowledged === 10) {
      // await channel.close();
      await connection.close();
      process.exit(0);
    }
  }, { noAck: false });

  async function doWork(message) {
    return new Promise(resolve => {
      setTimeout(resolve, 1000);
    })
  }
})();

Here's the wireshark trace

65	17.218520	127.0.0.1	127.0.0.1	AMQP	569	Connection.Start 
67	17.221783	127.0.0.1	127.0.0.1	AMQP	378	Connection.Start-Ok 
69	17.222576	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune 
71	17.222984	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune-Ok 
73	17.223108	127.0.0.1	127.0.0.1	AMQP	72	Connection.Open vhost=/ 
75	17.223795	127.0.0.1	127.0.0.1	AMQP	69	Connection.Open-Ok 
77	17.227607	127.0.0.1	127.0.0.1	AMQP	69	Channel.Open 
79	17.228586	127.0.0.1	127.0.0.1	AMQP	72	Channel.Open-Ok 
81	17.229133	127.0.0.1	127.0.0.1	AMQP	69	Confirm.Select 
83	17.229794	127.0.0.1	127.0.0.1	AMQP	68	Confirm.Select-Ok 
85	17.230318	127.0.0.1	127.0.0.1	AMQP	80	Queue.Declare q=test 
87	17.231058	127.0.0.1	127.0.0.1	AMQP	81	Queue.Declare-Ok q=test 
89	17.233134	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
91	17.233161	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
93	17.233176	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
95	17.233195	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
97	17.233201	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
99	17.233210	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
101	17.233228	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
103	17.233235	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
105	17.233239	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
107	17.233245	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
109	17.234312	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
111	17.235296	127.0.0.1	127.0.0.1	AMQP	81	Basic.Consume q=test 
113	17.236397	127.0.0.1	127.0.0.1	AMQP	1060	Basic.Consume-Ok Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body 
117	18.241993	127.0.0.1	127.0.0.1	AMQP	89	Connection.Close reply=Cheers, thanks 
119	18.242433	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
121	18.242504	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
123	18.242550	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
125	18.242573	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
127	18.242609	127.0.0.1	127.0.0.1	AMQP	98	Basic.Ack Basic.Ack 
129	18.242632	127.0.0.1	127.0.0.1	AMQP	98	Basic.Ack Basic.Ack 
131	18.242684	127.0.0.1	127.0.0.1	AMQP	98	Basic.Ack Basic.Ack 
133	18.243721	127.0.0.1	127.0.0.1	AMQP	68	Connection.Close-Ok 

After running the script, all 10 messages remained on the queue. I suspect this is because the connection close instruction (117) was sent before the messages were acknowledged (119 - 131), hence the broker closed its side of the connection. There is no reply from a Basic.Ack, so no warning is given. Finally the connection close acknowledgment (133) is received from the broker.

If I re-run the test but uncomment the await channel.close() line, all messages were consumed from the queue.

7	0.002565	127.0.0.1	127.0.0.1	AMQP	569	Connection.Start 
9	0.005111	127.0.0.1	127.0.0.1	AMQP	378	Connection.Start-Ok 
11	0.006047	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune 
13	0.006412	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune-Ok 
15	0.006534	127.0.0.1	127.0.0.1	AMQP	72	Connection.Open vhost=/ 
17	0.007320	127.0.0.1	127.0.0.1	AMQP	69	Connection.Open-Ok 
19	0.010707	127.0.0.1	127.0.0.1	AMQP	69	Channel.Open 
21	0.011820	127.0.0.1	127.0.0.1	AMQP	72	Channel.Open-Ok 
23	0.012325	127.0.0.1	127.0.0.1	AMQP	69	Confirm.Select 
25	0.012986	127.0.0.1	127.0.0.1	AMQP	68	Confirm.Select-Ok 
27	0.013508	127.0.0.1	127.0.0.1	AMQP	80	Queue.Declare q=test 
29	0.014161	127.0.0.1	127.0.0.1	AMQP	81	Queue.Declare-Ok q=test 
31	0.016229	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
33	0.016256	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
35	0.016272	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
37	0.016279	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
39	0.016300	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
41	0.016342	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
43	0.016366	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
45	0.016385	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
47	0.016393	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
49	0.016422	127.0.0.1	127.0.0.1	AMQP	114	Basic.Publish x= rk=test Content-Header Content-Body 
51	0.017558	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
53	0.018452	127.0.0.1	127.0.0.1	AMQP	81	Basic.Consume q=test 
55	0.019444	127.0.0.1	127.0.0.1	AMQP	1060	Basic.Consume-Ok Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body Basic.Deliver x= rk=test Content-Header Content-Body 
65	1.026862	127.0.0.1	127.0.0.1	AMQP	77	Basic.Ack 
67	1.026986	127.0.0.1	127.0.0.1	AMQP	98	Basic.Ack Basic.Ack 
69	1.027041	127.0.0.1	127.0.0.1	AMQP	119	Basic.Ack Basic.Ack Basic.Ack 
71	1.027090	127.0.0.1	127.0.0.1	AMQP	166	Basic.Ack Basic.Ack Basic.Ack Basic.Ack Channel.Close reply=Goodbye 
73	1.028741	127.0.0.1	127.0.0.1	AMQP	68	Channel.Close-Ok 
75	1.034298	127.0.0.1	127.0.0.1	AMQP	89	Connection.Close reply=Cheers, thanks 
77	1.035784	127.0.0.1	127.0.0.1	AMQP	68	Connection.Close-Ok 

The above wireshark trace shows that this time, the Channel.Close (last frame on 71) instruction was queued behind all the message acknowledgements (65-71). Because the script awaits for the channel close instruction to be acknowledged (73), the connection close instruction is not sent until the very end.

I think the underlying reason for all this is that when you call connection.close, amqplib writes the ConnectionClose operation to the specially reserved channel 0. Other operations such as acknowledging messages (Basic.Ack) are sent using channels created by the client.

Under the hood, the amqplib muxer decides which channel to read from before forwarding the packets to the broker. Roughly speaking it round robins between channels in in order of creation, oldest first, with the caveat that when a channel is first created it will get priority.

This means that channel 0 operations such as Connection.Close, will often be sent to the broker before those written to other channels. The solution is to ensure you explicitly close all channels (remembering to await), before closing the connection. The Channel.Close operation is sent using the same channel as for message acknowledgments, and so will be queued behind whatever has previously been buffered. Because the broker must reply to Channel.Close, the channel.close() function can be awaited. Doing so means that you can be sure that all pending channel operations prior to closing the channel have completed.

cressie176 avatar May 07 '22 08:05 cressie176

The remaining question is whether amqplib should formally close channels (i.e. send a ChannelClose to the broker) when the parent connection is closed.

cressie176 avatar May 07 '22 08:05 cressie176

I took a look at this over the weekend. I looks like it's possible to close the channels (and wait for the replies) before sending the ConnectionClose command, here. Something like...

C.closeBecause = function(reason, code, k) {
  const closeChannels = this.channels.map(function(ch, i) {
    if (i === 0) return Promise.resolve();
    // We're about to close the connection, so if a channel errors while closing there's not much we can doo
    return ch.channel.close().catch(() => {}); 
  })

  Promise.all(closeChannels).then(() => {
    this.sendMethod(0, defs.ConnectionClose, {
      replyText: reason,
      replyCode: code,
      methodId: 0, classId: 0
    });
    var s = stackCapture('closeBecause called: ' + reason);
    this.toClosing(s, k);
  })
};

It would still leave the library open to a narrower race condition where the user created a channel around the same time as calling connection.close(). We can minimise this further by adding a "isClosing" check to the connection and rejecting / yielding an error from the createChannel and createConfirmChannel methods, but I suspect there may still be a small gap.

It may also take a significant amount of time for applications with a high number of channels, which could lead to slower application shutdowns, resulting in whatever is managing the shutdown sending a kill. Not overly worried about this, as the alternative is to lose messages.

cressie176 avatar May 16 '22 10:05 cressie176

Any news on this @cressie176 ? I'm having the same problem, in my case after an ack i need to close the connection and kill the process. I would need to have an way to make sure the ack is already done before closing the connection

rubenmorim avatar Apr 04 '24 10:04 rubenmorim

Hi @rubenmorim, Make sure you close all channels before closing the connection and you should be fine.

cressie176 avatar Apr 04 '24 14:04 cressie176

Going to close this. I'm not confident about catching all the race conditions. It's documented you need to close channels before the connection.

cressie176 avatar Apr 11 '24 20:04 cressie176