amqp
amqp copied to clipboard
Possible bug in NotifyConfirm
I was using the NotifyConfirm
function like so:
ack, nack := channel.NotifyConfirm(make(chan uint64, numMaxPendingAcks), make(chan uint64, numMaxPendingNacks))
But i noticed that the confirms
chan in the implementation has buffer size len(ack)+len(nack)
.
len returns the number of unread messages in the buffer - zero for my case since I'm passing in newly created chans.
I would have expected cap to be used instead.
Am I missing something here?
For convenience, here is the current function from channel.go:
func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
confirms := ch.NotifyPublish(make(chan Confirmation, len(ack)+len(nack)))
go func() {
for c := range confirms {
if c.Ack {
ack <- c.DeliveryTag
} else {
nack <- c.DeliveryTag
}
}
close(ack)
if nack != ack {
close(nack)
}
}()
return ack, nack
}
This makes sense. Would you provide complete test code or a PR with tests demonstrating that the change works?
I wrote some tests but I realized while testing that since confirms
is drained into the ack
and nack
chans, it's not exactly correct to set confirms
to have the sum of the capacities either. Because that would mean there could be at least twice the number of pending confirms from what is intended.
E.g.
ack, nack := channel.NotifyConfirm(make(chan uint64, 10), make(chan uint64, 10))
If we had used confirms := ch.NotifyPublish(make(chan Confirmation, cap(ack)+cap(nack)))
, the program will only start blocking when ack
is full with 10 pending confirms, and confirms
is full with 20 more pending confirms.
What I don't get now, is why len
is used at all for the capacity of confirms
? I would find it more intuitive to use an unbuffered channel (confirms := ch.NotifyPublish(make(chan Confirmation)
) instead.
Does this make sense? Seems like there could be a use case I'm missing.
It's hard to remember if it was len()
because of possible in-flight message deadlocks, or should have been cap()
.
The design of NotifyConfirm
had the problem of the scheduler changing the sequence of Ack and Nacks which made this API useless for most cases like at-least-once publish semantics.
NotifyPublish
should be used in almost all cases, so to be conservative, cap()
could be used instead of len()
and the docs could make it clearer that this API is only useful for telemetry/instrumentation.