go-rabbitmq icon indicating copy to clipboard operation
go-rabbitmq copied to clipboard

AutoReconnect only reconnect the consumer to 1 queue instead of all the queues

Open FR19 opened this issue 1 year ago • 0 comments

Sorry I'm new to golang, I just created 1 consumer to consume multiple queues. But when the connection dropped, and the reconnect mechanism started, it successfully reconnect but only to 1 queue. Below is my code for POC

// main.go
package main

import (
	"log"
	"strings"
	"sync"

	. "poc-reconnect/worker/east"
	. "poc-reconnect/worker/west"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

var workerList = make(map[string]interface {
	Execute(wg *sync.WaitGroup, concurrent int)
})

func main() {
	amqpAddress := "amqp://guest:guest@localhost:5672/poc"

	// initialize consumer
	consumer, err := rabbitmq.NewConsumer(
		amqpAddress, rabbitmq.Config{},
	)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// Register the workers
	workerList["west"] = West{Consumer: consumer}
	workerList["east"] = East{Consumer: consumer}

	// worker concurrent configuration
	workerConcurrent := map[string]int{
		"west": 2,
		"east": 2,
	}

	wg := sync.WaitGroup{}

	// run all worker
	for worker, thread := range workerConcurrent {
		wg.Add(1)
		go workerList[strings.ToLower(worker)].Execute(&wg, thread)
	}
	wg.Wait()
}
// worker/east/east.go
package east

import (
	"log"
	"sync"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

type East struct {
	Consumer rabbitmq.Consumer
}

const work_queue = "east_queue"

func (e East) Execute(wg *sync.WaitGroup, concurrent int) {
	defer wg.Done()

	forever := make(chan bool)
	// Subscribing to the queue
	err := e.Consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		work_queue,
		[]string{work_queue},
		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
	)

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

	<-forever
}
// worker/west/west.go
package west

import (
	"log"
	"sync"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

type West struct {
	Consumer rabbitmq.Consumer
}

const work_queue = "west_queue"

func (w West) Execute(wg *sync.WaitGroup, concurrent int) {
	defer wg.Done()

	forever := make(chan bool)
	// Subscribing to the queue
	err := w.Consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		work_queue,
		[]string{work_queue},
		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
	)

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

	<-forever
}

On the first run, the consumer correctly initialized image image

But after I force close the connection through the RabbitMQ UI to trigger the reconnect mechanism, it only reconnect to 1 queue image image

Is this bug or did I do something wrong in my code?

Thanks

FR19 avatar Aug 22 '22 14:08 FR19