amqp icon indicating copy to clipboard operation
amqp copied to clipboard

Possible bug in NotifyConfirm

Open tomforge opened this issue 5 years ago • 3 comments

I was using the NotifyConfirm function like so:

ack, nack := channel.NotifyConfirm(make(chan uint64, numMaxPendingAcks), make(chan uint64, numMaxPendingNacks))

But i noticed that the confirms chan in the implementation has buffer size len(ack)+len(nack).

len returns the number of unread messages in the buffer - zero for my case since I'm passing in newly created chans.

I would have expected cap to be used instead.

Am I missing something here?

For convenience, here is the current function from channel.go:

func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
	confirms := ch.NotifyPublish(make(chan Confirmation, len(ack)+len(nack)))

	go func() {
		for c := range confirms {
			if c.Ack {
				ack <- c.DeliveryTag
			} else {
				nack <- c.DeliveryTag
			}
		}
		close(ack)
		if nack != ack {
			close(nack)
		}
	}()

	return ack, nack
}

tomforge avatar Jul 12 '18 04:07 tomforge

This makes sense. Would you provide complete test code or a PR with tests demonstrating that the change works?

lukebakken avatar Jul 12 '18 13:07 lukebakken

I wrote some tests but I realized while testing that since confirms is drained into the ack and nack chans, it's not exactly correct to set confirms to have the sum of the capacities either. Because that would mean there could be at least twice the number of pending confirms from what is intended.

E.g.

ack, nack := channel.NotifyConfirm(make(chan uint64, 10), make(chan uint64, 10))

If we had used confirms := ch.NotifyPublish(make(chan Confirmation, cap(ack)+cap(nack))), the program will only start blocking when ack is full with 10 pending confirms, and confirms is full with 20 more pending confirms.

What I don't get now, is why len is used at all for the capacity of confirms? I would find it more intuitive to use an unbuffered channel (confirms := ch.NotifyPublish(make(chan Confirmation)) instead.

Does this make sense? Seems like there could be a use case I'm missing.

tomforge avatar Jul 13 '18 04:07 tomforge

It's hard to remember if it was len() because of possible in-flight message deadlocks, or should have been cap().

The design of NotifyConfirm had the problem of the scheduler changing the sequence of Ack and Nacks which made this API useless for most cases like at-least-once publish semantics.

NotifyPublish should be used in almost all cases, so to be conservative, cap() could be used instead of len() and the docs could make it clearer that this API is only useful for telemetry/instrumentation.

streadway avatar Feb 25 '19 11:02 streadway