amqp091-go icon indicating copy to clipboard operation
amqp091-go copied to clipboard

SAC not working properly

Open laststem opened this issue 1 year ago • 10 comments

[Issue]

I have 2 single active consumers (A, B).

A first consumer was active, B second was ready.

then, i killed A first active consumer while other process publish 10000 messages.

I was expecting a transition from A to B for ACTIVE consumer.

but it isn't.

[Reproduce]

  1. run rabbitmq server
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmqtest rabbitmq:3.10.6-management
  1. run consumer
func main() {
	conn, err := amqp091.DialConfig("amqp://guest:guest@localhost:5672/", amqp091.Config{
		Heartbeat: time.Second * 30,
	})
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	if err := ch.Qos(200, 0, false); err != nil {
		panic(err)
	}

	queueArgs := make(amqp091.Table)
	queueArgs["x-single-active-consumer"] = true
	_, err = ch.QueueDeclare("queue",
		true,      // durable
		false,     // auto delete
		false,     //exclusive
		false,     //noWait
		queueArgs, // queue args
	)
	if err != nil {
		panic(err)
	}

	msgs, err := ch.Consume("queue", "consumer", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	d := make(chan bool)
	go func() {
		for msg := range msgs {
			fmt.Println(string(msg.Body))
			_ = msg.Ack(true)
		}
		d <- true
	}()

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	<-sigs

	if err := ch.Cancel("consumer", false); err != nil {
		panic(err)
	}
	fmt.Println("cancel consume")
	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	<-d
	fmt.Println("terminate")
}
go run consumer.go (A session)
go run consumer.go (B session)
  1. run producer
func main() {
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		err := ch.Publish("", "queue", false, false, amqp091.Publishing{
			DeliveryMode: 0,
			ContentType:  "text/plain",
			Body:         []byte(fmt.Sprintf("%d", i)),
		})
		if err != nil {
			panic(err)
		}
	}

	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	fmt.Println("terminate")
}
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ...
queue_name	channel_pid	consumer_tag	ack_required	prefetch_count	active	arguments
queue	<[email protected]>	consumer	true	200	false	[]
queue	<[email protected]>	consumer	true	200	true	[]
  1. run publisher
go run publisher.go
  1. kill Active consumer when message is received. It is important that Active consumer does not receive all messages. It should be killed in the middle of receiving messages.
# active consumer log
1
2
3
4
...
^C (it will be print text "terminate")
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ..

As a result, I was expecting a transition from A to B , but it didn't. All consumers are gone.

However, I found error logs of rabbitmq server. It's too long, so I'm attaching it as a file. rabbit-error-logs.txt

It doesn't seem to be a problem with amqp091-go.

laststem avatar Jul 26 '22 05:07 laststem

Hey there :wave:

~~I can't reproduce this issue with the code you provided. Make sure that the queue is declared with x-single-active-consumer set to true.~~

I added the following lines to your consumer code to declare the queue. Please note that Channel.QueueDeclare() returns an error if the queue exists and its parameters don't match the queue declaration; it's a good safety check to make sure your queues are as your app expects.

	queueArgs := make(amqp091.Table)
	queueArgs["x-single-active-consumer"] = true

	queue, err := ch.QueueDeclare("queue",
		true, // durable
		false, // auto delete
		false, //exclusive
		false, //noWait
		queueArgs, // queue args
	)
	if err != nil {
		panic(err)
	}

Edit: I can reproduce the issue, see https://github.com/rabbitmq/amqp091-go/issues/106#issuecomment-1206633097

Zerpet avatar Aug 02 '22 17:08 Zerpet

Thanks to reply.

When I try again, I got the same problem. I edited the description to be more detailed.

laststem avatar Aug 03 '22 06:08 laststem

I can confirm that the problem seems to be in RabbitMQ. The queue crashes for some reason, and the consumer does not take over.

Edit: I can reproduce the behaviour of the queue crash when both consumers are attached to the queue before the producer starts publishing. If the producer is already running, there's no queue crash. This is a CQ. Verified this in RabbitMQ 3.10.6, current rabbitmq:3-management in DockerHub.

Zerpet avatar Aug 05 '22 16:08 Zerpet

Thank you.

Is there any plans to fix it? I want to use this features in production.

laststem avatar Aug 08 '22 05:08 laststem

@laststem You can try to use a quorum queue (SAC implementation is different between classic queues and quorum queues) and stick to it if you don't reproduce the issue and quorum queue is an acceptable type of queue for your use case.

acogoluegnes avatar Aug 08 '22 06:08 acogoluegnes

@Zerpet Can you reference or create an issue on the broker repository so we can keep track of this issue? Thanks.

acogoluegnes avatar Aug 08 '22 06:08 acogoluegnes

Thanks to reply. @acogoluegnes

I can't use quorum queue because there is constraint that queue have only one consumer.

currently, we use classic queue with exclusive flag. (1 Queue - 1 Consumer) but this model can't provide high availability. (if consumer dead?)

if we use quorum queue, it can provide high availability, but doesn't meet constraint. (1 Queue - N Consumer) if we use classic queue with SAC, it can provide high availability(through change active consumer) and meet constraint. (1 Queue - N Consumer, but only one Active)

laststem avatar Aug 08 '22 06:08 laststem

@acogoluegnes done! https://github.com/rabbitmq/rabbitmq-server/issues/5460

Zerpet avatar Aug 08 '22 09:08 Zerpet

@laststem Quorum queues support single active consumer, so you make sure that only 1 consumer consumes at a time. You enable it just like with a classic queue. Sorry I was not clear in my first comment, I meant "try with a quorum queue with single active consumer enabled".

acogoluegnes avatar Aug 08 '22 13:08 acogoluegnes

Good! I will try. thank you @acogoluegnes

laststem avatar Aug 08 '22 13:08 laststem

https://github.com/rabbitmq/rabbitmq-server/issues/5460 has been closed and is targetting RabbitMQ server 3.11.5. I'll mark this issue as complete for now. Feel free to re-open if you encounter this issue in 3.11.5+

Zerpet avatar Dec 07 '22 09:12 Zerpet