rabbitmq-cli-consumer icon indicating copy to clipboard operation
rabbitmq-cli-consumer copied to clipboard

Limit on concurrency

Open estahn opened this issue 6 years ago • 13 comments

What's the limit on concurrency? If we have 10k messages in the queue, will this spin-up 10k PHP processes at the same time?

estahn avatar Feb 22 '19 01:02 estahn

The rabbitmq-cli-Consumer does not deal with concurrency. It processes one message at a time. If you need concurrency, you can spin up as mutch instances as you need.

corvus-ch avatar Feb 22 '19 06:02 corvus-ch

@corvus-ch Would you be willing to accept a PR that implements handling of concurrency? We currently have a PHP version of what rabbitmq-cli-consumer does, which basically runs sequentially as well. We upscale our Kubernetes pods based on the queue length, but that seems all a bit wasteful.

estahn avatar Feb 22 '19 06:02 estahn

Dealing with concurrency is a complexity I tried to keep out of the project. The way it is designed is to delegate it to a suitable process supervisor. With Kubernetes in mind (and similar platforms for that matter), I can see why this might be helpful from an orchestrationanl point of view.

If I would accept such a PR, the following requirements must be met:

  1. The default must be the current behaviour.
  2. It must be possible to keep the log output for different messageages apart. Here I refer to the STDOUT and STDERR captured from the script processing the message.

With those two things in mind, feel free to go ahead and provide PR.

corvus-ch avatar Feb 22 '19 19:02 corvus-ch

@corvus-ch Working through the code I'm wondering why the func (p *processor) Process is given the message as type delivery.Delivery instead of just the plain message.

Shouldn't the responsibility of ack/nack'ing be lying on the consumer? I would imagine the processor returns success/failure and the consumer will ack/nack based on that.

estahn avatar Feb 26 '19 23:02 estahn

@corvus-ch I have something that is working to some extent, but I'm not sure where that exception is coming from.

2019/02/27 14:41:50 Connecting RabbitMQ...
2019/02/27 14:41:50 Connected.
2019/02/27 14:41:50 Opening channel...
2019/02/27 14:41:50 Done.
2019/02/27 14:41:50 Setting QoS...
2019/02/27 14:41:50 Succeeded setting QoS.
2019/02/27 14:41:50 Declaring queue "foobar"...
2019/02/27 14:41:50 Registering consumer...
2019/02/27 14:41:50 Succeeded registering consumer.
2019/02/27 14:41:50 Processing messages with 3 workers.
2019/02/27 14:41:50 Waiting for messages...
2019/02/27 14:41:50 [Worker 2] Processing message...
2019/02/27 14:41:50 [Worker 0] Processing message...
2019/02/27 14:41:50 [Worker 1] Processing message...
hellohello2019/02/27 14:42:00 [Worker 0] Processed!
2019/02/27 14:42:00 [Worker 0] Processing message...
hello2019/02/27 14:42:00 [Worker 2] Processed!
2019/02/27 14:42:00 [Worker 2] Processing message...
2019/02/27 14:42:00 [Worker 1] Processed!
2019/02/27 14:42:00 Exception (406) Reason: "PRECONDITION_FAILED - unknown delivery tag 1"

estahn avatar Feb 27 '19 03:02 estahn

@estahn Can you do a pull request with your changes so I can have a look?

Searching for that error on the web, indicates that your code might be mixing up messages and acknowledgments and or that sharing channels across threads/(go functions?) is an issue. Without looking at the code, it is not possible for me to tell what the issue is.

corvus-ch avatar Feb 27 '19 06:02 corvus-ch

@corvus-ch Will do shortly. I think the issue is double acking of the message (https://github.com/streadway/amqp/issues/83).

estahn avatar Feb 27 '19 11:02 estahn

Just some comments:

  • I have removed the mutex from the processor as I think it is not necessary if you move variables into local scope. If you want to keep the mutex then the current solution isn't working.
  • There are sporadic issues manifesting in unknown delivery tag 1. Possibly double ack-ing the message.
  • I would even go as far and remove the responsibility of ack/nack from the processor.

estahn avatar Feb 27 '19 11:02 estahn

In order to find what the issue with the unknown delivery tag is, I have hacked together a minimal example.

package main

import (
	"flag"
	"log"

	"github.com/streadway/amqp"
)

var (
	uri   = flag.String("uri", "amqp://guest:guest@localhost", "AMQP URI")
	queue = flag.String("queue", "myqueue", "Ephemeral AMQP queue name")
	ctag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
)

func init() {
	flag.Parse()
}

func main() {
	conn, err := amqp.Dial(*uri)
	if err != nil {
		log.Fatalf("dial: %v", err)
		return
	}

	channel, err := conn.Channel()
	if err != nil {
		log.Fatalf("channel: %v", err)
		return
	}

	queue, err := channel.QueueDeclare(*queue, true, false, false, false, nil)
	if err != nil {
		log.Fatalf("queue Declare: %v", err)
		return
	}

	if err := channel.Qos(5, 0, true); err != nil {
		log.Fatalf("qos: %v", err)
		return
	}

	deliveries, err := channel.Consume(queue.Name, *ctag, false, false, false, false, nil)
	if err != nil {
		log.Fatalf("queue Consume: %v", err)
		return
	}

	done := make(chan error)

	for i := 0; i < 10; i++ {
		go handle(i, deliveries, done)
	}

	if err := <- done; err != nil {
		log.Fatalf("error during message consumption: %v", err)
	}
}

func handle(i int, deliveries <-chan amqp.Delivery, done chan error) {
	for d := range deliveries {
		log.Printf("[%d] got %dB delivery: [%v] %q", i, len(d.Body), d.DeliveryTag, d.Body)
		if err := d.Ack(false); err != nil {
			done <- err
		}
	}
	done <- nil
}

The issue lies with the message acknowledgement. The boolean passed to Ack() is set to true the current and all prior unacknowledged messages will be acknowledged. With concurrency in place, this can lead to a situation where a worker tries to acknowledge a message which was already acknowledged by another worker. This occurs if worker A is processing message 1 and worker B is processing message 2 and worker B finishes before A. Once worker A tries to acknowledge its message, the error occurs.

To fix this, the boolean in https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L28 and https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L33 needs to be changed to false.

corvus-ch avatar Mar 02 '19 09:03 corvus-ch

@corvus-ch Yay! Nice job tracking this down!

Docu: https://github.com/streadway/amqp/blob/master/delivery.go#L113-L115

I'll add the adjustments.

estahn avatar Mar 03 '19 01:03 estahn

Hi all - I also needed an option for message processing concurrency, and wanted to implement it using multiple RabbitMQ channels to be as close as possible to how our project handles RabbitMQ in other languages (we have consumers in node and python, as well as PHP). This seems to be the canonical way to handle multiple concurrent consumers in a single client in RabbitMQ.

bclougherty avatar Aug 12 '19 16:08 bclougherty

@corvus-ch What is the intent of the mutex in processor.Process? After further testing, my multi-channel solution is fetching messages in parallel, but only processing one at a time because each call has to wait for the mutex to free up. The simple solution is to remove the mutex, but that could obviously cause issues that I'm not seeing. The more complicated solution would be to have a separate processor per channel, but I don't want to pursue that solution if there's a simpler one.

bclougherty avatar Aug 22 '19 15:08 bclougherty

@corvus-ch I think I answered my own question - Processor had a single os.Cmd that it was re-using for each new call. I just pushed an update to my PR that uses a new os.Cmd for each call to Process instead.

bclougherty avatar Aug 22 '19 16:08 bclougherty