amqp-ts icon indicating copy to clipboard operation
amqp-ts copied to clipboard

message.ack() takes 40+ milliseconds to actually ack

Open justinrush opened this issue 4 years ago • 5 comments

I'm troubleshooting some slow performance in my application that I've narrowed down to the amqp-ts consumer component. I have a go-lang based sender that sends messages to a queue and a typescript/node component that listens on that queue (using amqp-ts) and sends the response back on a different queue. A basic test script is at the bottom.

It seems like message.ack() returns almost immediately (<1ms), but instrumenting the various parts of my application has shown that the go-lang receiver gets the message and sends a new message before the first message is actually shown as acknowledged in rabbitmq. Said differently:

  1. publish message-1
  2. consume message-1
  3. receive response for message-1
  4. publish message-2
  5. ack finishes for message-1

With prefetch = 1, this results in about 40ms of latency being added to the response. If I bump prefech to 2, then every other message is +40ms with other messages only being 10ms..

What is message.ack() doing? is it acknowledging receipt in the queue (e.g. RabbitMQ) or in the consumer of the queue (e.g. my go-lang based component)?

I'm using RabbitMQ 3.8.8

Code:

import * as Amqp from "amqp-ts";

const conn = new Amqp.Connection(
	"amqp://user:pass@rabbitmq:5672/",
	{ timeout: 1000 },
	{ retries: 1, interval: 1000 },
);

conn.completeConfiguration().then(() => {
	const recvQueue = conn.declareQueue("rpc-ugabuga", {
		durable: true,
		autoDelete: false,
		prefetch: 1,
	});

	const respQueue = conn.declareQueue("ugabuga", {
		durable: true,
		autoDelete: false,
		prefetch: 1,
	});

	console.log("consumer activating...");
	recvQueue.activateConsumer(
		(message: Amqp.Message) => {
			const req = message.getContent();
			const resp = {
				correlationId: req.correlationId,
				timestamps: req.timestamps,
			};
			const msg = new Amqp.Message(resp);
			msg.properties.correlationId = "correlation-id-1";
			respQueue.send(msg);

			message.ack();
		},
		{ consumerTag: "my-consumer", manualAck: true },
	);
});

justinrush avatar Oct 13 '20 17:10 justinrush

When you call message.ack(), you cause an acknowledgement message to be sent to RabbitMQ. The ack() function returns after the acknowledgement message has been successfully sent. But that doesn't mean RabbitMQ has received the message yet. It will take some time before RabbbitMQ receives and processes the acknowledgement message.

austin-beer avatar Oct 13 '20 18:10 austin-beer

is there a way to know when that is done? Trying to figure out what is different with the typescript code because the same client code using the streadway/amqp go-lang library doesn't have the same issue.

justinrush avatar Oct 13 '20 18:10 justinrush

I don't believe so. The only way to know would be for RabbitMQ to send you a message to acknowledge that it had received your acknowledgement message. RabbitMQ doesn't send acknowledgement messages for receiving acknowledgement messages.

The prefetch setting kind of tells you. With a prefetch set to 1 RabbitMQ won't send you the next message until it's received the acknowledgement from you for the previous message it sent.

austin-beer avatar Oct 13 '20 18:10 austin-beer

do other people see this taking 40+ms? there isn't really another library to try with typescript for comparisons... rhea requires amqp 1.0.

justinrush avatar Oct 14 '20 15:10 justinrush

I'm honestly not sure. I haven't done any measurements of the ack latency in my own system.

austin-beer avatar Oct 14 '20 15:10 austin-beer