go-rabbitmq
go-rabbitmq copied to clipboard
NotifyPublish ReconnectCount - how is it intended to be used?
Similar to #66 I am trying to enable confirms using the NotifyPublish handler - attempting to manually keep track of the DeliveryTag being set by incrementing each time I successfully publish
However on reconnect we don't seem to have any mechanism for the publisher to know that a reconnect happens so this manual tracking of DeliveryTags falls over when the channels DeliveryTag silently resets to 0
For example:
// Publisher func
go func() {
publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
publisher_confirms <- p
})
// Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
deliveryCounter := messagestore.DeliveryTag(1)
for msg := range messages {
err := publisher.PublishWithContext(
context.Background(),
msg.Body,
[]string{msg.RoutingKey},
rabbitmq.WithPublishOptionsExchange("ingress"),
rabbitmq.WithPublishOptionsContentType(msg.ContentType),
rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
rabbitmq.WithPublishOptionsAppID(msg.AppId),
rabbitmq.WithPublishOptionsPersistentDelivery,
)
if err != nil {
logger.Errorf("error publishing message, %s", err)
continue
}
store.AppendMessage(deliveryCounter, msg)
deliveryCounter++
}
}()
// Confirms func
go func() {
for {
select {
case confirm := <-publisher_confirms:
// Try to confirm
if confirm.Ack {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Whatever I do here you can't work out the delivery counter since the reconnection count isn't related to what the delivery counter was _before_
// it was reset
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag + uint64(confirm.ReconnectionCount)))
} else {
store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag+uint64(confirm.ReconnectionCount)), true)
}
}
}
}()
Am I missing something in how it is supposed to be used? Is there any internal delivery tag which could be returned from the publisher object instead to track it?