amqp icon indicating copy to clipboard operation
amqp copied to clipboard

Prefetch not assigned to first channel

Open osok opened this issue 4 years ago • 0 comments

I have an application that consumes messages from a channel. The execution can be lengthy like a few minutes to run. When the application picks up a message it builds one of four configurations. In some cases it can be short, so I want it to pick up the next message.

Server --> (writes four messages) ----> I have four processes consuming [Builder]

The builder is the application. I have it set to prefetch just one message from the channel.

	// Connect to the rabbitMQ instance
	connection, err := amqp.Dial(url)
	defer connection.Close()

	if err != nil {
		util.FailOnError(err, "could not establish connection with RabbitMQ in main")
		return
	}

	channel, err := connection.Channel()
	if err != nil {
		util.FailOnError(err, "could not open first RabbitMQ channel in main")
		return
	}

	// Only pick up one message at a time.  Since some of the workers will take a long time to compile all the
	// executables, this helps to allow other workers to pick up work.
	channel.Qos(1,0,true)
	// We create an exchange that will bind to the queue to send and receive messages
	err = channel.ExchangeDeclare(builder2.BUILD_EXCHANGE, "topic", true, false, false, false, nil)

	if err != nil {
		util.FailOnError(err, "error declaring exchange in main")
		return
	}

	// We bind the queue to the exchange to send and receive data from the queue
	err = channel.QueueBind(builder2.BUILD_QUEUE, "#", builder2.BUILD_EXCHANGE, false, nil)
	if err != nil {
		// we need to reopen the channel
		channel, err = connection.Channel()
		if err != nil {
			util.FailOnError(err, "could not open RabbitMQ channel in main")
			return
		}
		err = declareQueue(channel)
		if err != nil {
			util.FailOnError(err, "error binding to the queue in main")
			return
		}
	}

	// We consume data from the queue named Test using the channel we created in go.
	msgs, err := channel.Consume(builder2.BUILD_QUEUE, "", false, false, false, false, nil)

So when I run the Builder four times, I get get four channels. The prefetch is properly set for channel 2,3 &4, but channel 1 is not set.

So when the server fires off the first batch, and then a second, I see five messages in the first channel and one in each of the other three.

go version go1.14.2 linux/amd64 I pulled amqp as late as today 4/10/2021

2021-04-10_16-47-15

osok avatar Apr 10 '21 20:04 osok