cony icon indicating copy to clipboard operation
cony copied to clipboard

Handling prefetched messages upon lost connection?

Open jraede opened this issue 7 years ago • 3 comments

Awesome package, this helps us a lot. Just one question that's come up while testing various edge cases:

If prefetch is set to > 1, then you could conceivably have the following scenario:

  1. Multiple messages are published and delivered to the consumer, pending processing
  2. Connection is lost (server blip, let's say). Cony gracefully handles reconnection, BUT
  3. The amqp.Messages in the deliveries channel reference a channel that is now CLOSED.

How would you suggest handling this case? If we remake the deliveries channel would that imply a NACK on the rabbit server?

jraede avatar Apr 22 '17 22:04 jraede

@jraede hello and thank you for kind words!

It's not that simple how it works, although the problem with prefetch still can strike. Each Consumer has own chan amqp.Delivery which is not a reference to what amqp.#Channel.Consume() would return. Our Consumer copies messages from "per session" channel returned from amqp to own persistent chan amqp.Delivery. Proof: https://github.com/assembla/cony/blob/master/consumer.go#L92 .

Frankly, I don't know how to "fix" this. If you have ideas — please post them.

kron4eg avatar Apr 23 '17 09:04 kron4eg

Assuming I'm correct and RabbitMQ will redeliver those messages to another channel (even the same consumer on the new reconnected channel), could we just remake the cony.Consumer.deliveries in consumer.serve? e.g.:

func (c *Consumer) serve(client mqDeleter, ch mqChannel) {
	if c.reportErr(ch.Qos(c.qos, 0, false)) {
		return
	}

	// If prefetch > 1, c.deliveries might have messages in it
	// that are associated with a closed RabbitMQ channel (if e.g.
	// there was a reconnection event). Rabbit will already deliver
	// those messages to a new channel so we need to remove
	// references to them here.
	c.deliveries = make(chan amqp.Delivery)

	deliveries, err2 := ch.Consume(c.q.Name,
		c.tag,       // consumer tag
		c.autoAck,   // autoAck,
		c.exclusive, // exclusive,
		c.noLocal,   // noLocal,
		false,       // noWait,
		nil,         // args Table
	)
	if c.reportErr(err2) {
		return
	}

	for {
		select {
		case <-c.stop:
			client.deleteConsumer(c)
			ch.Close()
			return
		case d, ok := <-deliveries: // deliveries will be closed once channel is closed (disconnected from network)
			if !ok {
				return
			}
			c.deliveries <- d
		}
	}
}

jraede avatar Apr 23 '17 16:04 jraede

Assuming that user's code uses it like in basic example:

for cli.Loop() {
	select {
	case msg := <-cns.Deliveries():
		handleMessage(msg)
	}
}

meaning that user calls Consumer.Deliveries() every iteration to get a channel then yes, it's possible. But there is a problem :) Consumer.Deliveries() documentation says that:

Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects

Code like this will break.

message := cns.Deliveries()
for cli.Loop() {
	select {
	case msg := <-messages:
		handleMessage(msg)
	}
}

kron4eg avatar Apr 23 '17 18:04 kron4eg