go-rabbitmq icon indicating copy to clipboard operation
go-rabbitmq copied to clipboard

NotifyPublish ReconnectCount - how is it intended to be used?

Open jamoflaw opened this issue 1 month ago • 2 comments

Similar to #66 I am trying to enable confirms using the NotifyPublish handler - attempting to manually keep track of the DeliveryTag being set by incrementing each time I successfully publish

However on reconnect we don't seem to have any mechanism for the publisher to know that a reconnect happens so this manual tracking of DeliveryTags falls over when the channels DeliveryTag silently resets to 0

For example:

	// Publisher func
	go func() {

		publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
			publisher_confirms <- p
		})

		// Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
		deliveryCounter := messagestore.DeliveryTag(1)
		for msg := range messages {

			err := publisher.PublishWithContext(
				context.Background(),
				msg.Body,
				[]string{msg.RoutingKey},

				rabbitmq.WithPublishOptionsExchange("ingress"),
				rabbitmq.WithPublishOptionsContentType(msg.ContentType),
				rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
				rabbitmq.WithPublishOptionsAppID(msg.AppId),
				rabbitmq.WithPublishOptionsPersistentDelivery,
			)
			if err != nil {
				logger.Errorf("error publishing message, %s", err)
				continue
			}

			store.AppendMessage(deliveryCounter, msg)
			deliveryCounter++

		}
	}()

	// Confirms func
	go func() {

		for {
			select {
			case confirm := <-publisher_confirms:
				// Try to confirm
				if confirm.Ack {
					/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
					// Whatever I do here  you can't work out the delivery counter since the reconnection count isn't related to what the delivery counter was _before_
					// it was reset
					/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
					store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag + uint64(confirm.ReconnectionCount)))
				} else {
					store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag+uint64(confirm.ReconnectionCount)), true)
				}
			}
		}
	}()

Am I missing something in how it is supposed to be used? Is there any internal delivery tag which could be returned from the publisher object instead to track it?

jamoflaw avatar May 09 '24 21:05 jamoflaw