rmq icon indicating copy to clipboard operation
rmq copied to clipboard

Start a stopped queue

Open Jesse0Michael opened this issue 4 years ago â€ĸ 5 comments

Add the ability to start a queue that has been stopped by resetting the queue settings and channels when finishing stopping the consumers.

The old consumers are removed when you restart consuming, so will need to be re-added after you've started the queue again.

You will be unable to add consumers back until the queue has finished stopping and you have started consumption again.


for https://github.com/adjust/rmq/issues/56 cc @wellle

To reiterate from that issue, my use case is to pause consuming the queue if the downstream service is down to avoid cascading failures using a circuit breaker. (trying to publish to the down service can put it in a locked state and make it unable to recover, and if we just use the circuit breaker to auto .Push() we end up hitting redis so hard it gets connection timeouts 😅)

Let me know if there's anything I'm missing, of if you would like a different approach, or needs more testing.

Jesse0Michael avatar Oct 06 '20 15:10 Jesse0Michael

Hi, sorry to dig it, but can we update this PR 😃 Would be nice to get this fixed ! đŸ’¯

cyrinux avatar Jun 27 '21 22:06 cyrinux

rebased/updated

Jesse0Michael avatar Jun 28 '21 16:06 Jesse0Michael

@Jesse0Michael thanks for the rebase 🛩ī¸

cyrinux avatar Jun 29 '21 07:06 cyrinux

I am reluctant to use an approach which restarts a closed queue. Below I will list some of the reasons why I am not in favour of this approach and then try to describe an alternative approach which I hope will satisfy your use-case.


Currently the lifecycle of a queue in RMQ looks like

Open -> Consuming -> Stopped

This is a very simple set of states and, right now, a queue never returns to a previous state. This makes each of the transitions simpler to reason about. This is a very valuable property in a highly concurrent system like RMQ.

In your use-case there are two reasons why StopConsuming may be called.

  1. Your circuit breaker has triggered and you want to shutdown your consumers to allow the system to recover
  2. The connection heartbeat has failed and the connection has shut itself down

The circuit breaker case is in principle recoverable, as demonstrated in your PR. However, the heartbeat failure case is not recoverable. I don't like the idea of having this mixture of recoverable and unrecoverable queue stoppages if we can avoid it.

Neither of the arguments I have given above indicate that having queues be restartable is not possible. However, I really believe that it would represent a significant increase in complexity for the RMQ implementation and a significant jump in the size of the mental model required to effectively work on RMQ internals.

That's enough negativity.


One thing that jumps out at me from this PR is that all of the consumers are removed from the queue when it is stopped and restarted. The way that consumer queues are setup, at least in all the cases that I have seen, looks like (ignoring errors):

connection, _ = mq.OpenConnectionWithRedisClient(queueTag, redisClient, rmqErrChan)
queue, _ = connection.OpenQueue(queueName)
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
    queue.AddConsumer("consumerName", newConsumer(i))
}

Under this PR the code that is required the get a stopped queue running again looks like

_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
    queue.AddConsumer("consumerName", newConsumer(i))
}

and currently the code that is required to get a stopped queue running again looks like

queue, _ = connection.OpenQueue(queueName)
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
    queue.AddConsumer("consumerName", newConsumer(i))
}

The only clear difference that I can see is that currently you need to hold onto a reference to connection and queueName in order to be able to get your system consuming again after the circuit breaker has fired. This seems like a very low amount of complexity. If I have misunderstood the PR or your use-case please correct me.


If we really need to have queues restart themselves, without having a reference to the queue-name or the connection, I would feel comfortable with adding a method

Recreate() (Queue, error)

This would not restart a stopped queue, but would create a new queue with the same name/connection. The recovery code in this case would look like

queue, _ = queue.Recreate()
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
    queue.AddConsumer("consumerName", newConsumer(i))
}

fmstephe avatar Jun 29 '21 09:06 fmstephe

Thanks for your detail reply @fmstephe I will try to apply this on my project.

cyrinux avatar Jun 30 '21 18:06 cyrinux

Seems like the approach described above is fine. Closing :v:

wellle avatar Jan 17 '24 16:01 wellle