cony
cony copied to clipboard
Handling prefetched messages upon lost connection?
Awesome package, this helps us a lot. Just one question that's come up while testing various edge cases:
If prefetch is set to > 1, then you could conceivably have the following scenario:
- Multiple messages are published and delivered to the consumer, pending processing
- Connection is lost (server blip, let's say). Cony gracefully handles reconnection, BUT
- The
amqp.Message
s in thedeliveries
channel reference a channel that is now CLOSED.
How would you suggest handling this case? If we remake the deliveries channel would that imply a NACK on the rabbit server?
@jraede hello and thank you for kind words!
It's not that simple how it works, although the problem with prefetch still can strike. Each Consumer
has own chan amqp.Delivery
which is not a reference to what amqp.#Channel.Consume()
would return. Our Consumer copies messages from "per session" channel returned from amqp to own persistent chan amqp.Delivery
. Proof: https://github.com/assembla/cony/blob/master/consumer.go#L92 .
Frankly, I don't know how to "fix" this. If you have ideas — please post them.
Assuming I'm correct and RabbitMQ will redeliver those messages to another channel (even the same consumer on the new reconnected channel), could we just remake the cony.Consumer.deliveries
in consumer.serve
? e.g.:
func (c *Consumer) serve(client mqDeleter, ch mqChannel) {
if c.reportErr(ch.Qos(c.qos, 0, false)) {
return
}
// If prefetch > 1, c.deliveries might have messages in it
// that are associated with a closed RabbitMQ channel (if e.g.
// there was a reconnection event). Rabbit will already deliver
// those messages to a new channel so we need to remove
// references to them here.
c.deliveries = make(chan amqp.Delivery)
deliveries, err2 := ch.Consume(c.q.Name,
c.tag, // consumer tag
c.autoAck, // autoAck,
c.exclusive, // exclusive,
c.noLocal, // noLocal,
false, // noWait,
nil, // args Table
)
if c.reportErr(err2) {
return
}
for {
select {
case <-c.stop:
client.deleteConsumer(c)
ch.Close()
return
case d, ok := <-deliveries: // deliveries will be closed once channel is closed (disconnected from network)
if !ok {
return
}
c.deliveries <- d
}
}
}
Assuming that user's code uses it like in basic example:
for cli.Loop() {
select {
case msg := <-cns.Deliveries():
handleMessage(msg)
}
}
meaning that user calls Consumer.Deliveries()
every iteration to get a channel then yes, it's possible. But there is a problem :) Consumer.Deliveries()
documentation says that:
Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects
Code like this will break.
message := cns.Deliveries()
for cli.Loop() {
select {
case msg := <-messages:
handleMessage(msg)
}
}