amqp icon indicating copy to clipboard operation
amqp copied to clipboard

Returns are not delivered always before ACKs (but they should)

Open gm42 opened this issue 6 years ago • 3 comments

Quoting RabbitMQ documentation (https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed):

When Will Published Messages Be Confirmed by the Broker? For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

From the above I deduce that a generic client can detect the existence of a return by the fact that such return is always received before the ACK.

However, when using this client AMQP library there is no guarantee that this order is respected and I am in fact receiving returns after ACKs.

I can produce a testcase but can somebody please confirm whether this is by design or not?

My problem is (apparently) simple: send multiple messages concurrently, verify that each of them has been received and routed (with immediate=false, but durable=true). If the server-side constraint of "returns are ordered before ACKs" is not respected, this becomes impossible.

I am also suggesting here to add an example for consistent (transactional) message delivery with a concurrent pattern (otherwise the usage and capabilities of all the channels involved, with proper cleanup, might be unclear), but we can address that separately in a PR

Edit: I also noticed that returns have no DeliveryTag, which perhaps would be impossible to associate, so I am using MessageId (application-level) to detect the returns in my test.

gm42 avatar Sep 28 '18 14:09 gm42

This is my (dirty) fix:

diff --git a/channel.go b/channel.go
index dd2552c..f90d823 100644
--- a/channel.go
+++ b/channel.go
@@ -297,7 +297,12 @@ func (ch *Channel) dispatch(msg message) {
 		}
 		ch.notifyM.RUnlock()
 		ch.consumers.cancel(m.ConsumerTag)
-
+		/*
+		   # When Will Published Messages Be Confirmed by the Broker?
+		   For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue
+		   (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client
+		   before basic.ack. The same is true for negative acknowledgements (basic.nack).
+		*/
 	case *basicReturn:
 		ret := newReturn(*m)
 		ch.notifyM.RLock()
@@ -307,6 +312,14 @@ func (ch *Channel) dispatch(msg message) {
 		ch.notifyM.RUnlock()
 
 	case *basicAck:
+		// send the ACK also on the return channel, so that the sequence there is always preserved
+		ret := newAckReturn(true, m.DeliveryTag, m.Multiple, false)
+		ch.notifyM.RLock()
+		for _, c := range ch.returns {
+			c <- *ret
+		}
+		ch.notifyM.RUnlock()
+
 		if ch.confirming {
 			if m.Multiple {
 				ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
@@ -316,6 +329,14 @@ func (ch *Channel) dispatch(msg message) {
 		}
 
 	case *basicNack:
+		// send the NACK also on the return channel, so that the sequence there is always preserved
+		ret := newAckReturn(false, m.DeliveryTag, m.Multiple, m.Requeue)
+		ch.notifyM.RLock()
+		for _, c := range ch.returns {
+			c <- *ret
+		}
+		ch.notifyM.RUnlock()
+
 		if ch.confirming {
 			if m.Multiple {
 				ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
diff --git a/return.go b/return.go
index 10dcedb..feefad8 100644
--- a/return.go
+++ b/return.go
@@ -34,6 +34,23 @@ type Return struct {
 	AppId           string    // application use - creating application
 
 	Body []byte
+
+	// payload for ACK/NACK
+	ACKType     bool
+	ACK         bool
+	Multiple    bool
+	Requeue     bool
+	DeliveryTag uint64
+}
+
+func newAckReturn(isACK bool, deliveryTag uint64, multiple, requeue bool) *Return {
+	return &Return{
+		ACKType:     true,
+		ACK:         isACK,
+		DeliveryTag: deliveryTag,
+		Multiple:    multiple,
+		Requeue:     requeue,
+	}
 }
 
 func newReturn(msg basicReturn) *Return {

It may be dirty but at least makes return/ACKs be delivered in the correct order.

I think a proper solution should rework confirmations in this sense (order preservation with RETURNs).

gm42 avatar Dec 18 '18 15:12 gm42

Thanks, looks like you are onto something. I don't like how returns and acks are intertwined in your example but cannot think of a better solution without reviewing how return handlers are implemented.

michaelklishin avatar Feb 16 '19 21:02 michaelklishin

I don't like how returns and acks are intertwined in your example

Me neither! I was hoping that by showing where the pain is, you could come up with something more elegant? Although I don't see how it would be possible without some refactoring.

gm42 avatar Feb 19 '19 08:02 gm42