rmq
rmq copied to clipboard
Start a stopped queue
Add the ability to start a queue that has been stopped by resetting the queue settings and channels when finishing stopping the consumers.
The old consumers are removed when you restart consuming, so will need to be re-added after you've started the queue again.
You will be unable to add consumers back until the queue has finished stopping and you have started consumption again.
for https://github.com/adjust/rmq/issues/56 cc @wellle
To reiterate from that issue, my use case is to pause consuming the queue if the downstream service is down to avoid cascading failures using a circuit breaker. (trying to publish to the down service can put it in a locked state and make it unable to recover, and if we just use the circuit breaker to auto .Push() we end up hitting redis so hard it gets connection timeouts đ )
Let me know if there's anything I'm missing, of if you would like a different approach, or needs more testing.
Hi, sorry to dig it, but can we update this PR đ Would be nice to get this fixed ! đ¯
rebased/updated
@Jesse0Michael thanks for the rebase đŠī¸
I am reluctant to use an approach which restarts a closed queue. Below I will list some of the reasons why I am not in favour of this approach and then try to describe an alternative approach which I hope will satisfy your use-case.
Currently the lifecycle of a queue in RMQ looks like
Open -> Consuming -> Stopped
This is a very simple set of states and, right now, a queue never returns to a previous state. This makes each of the transitions simpler to reason about. This is a very valuable property in a highly concurrent system like RMQ.
In your use-case there are two reasons why StopConsuming may be called.
- Your circuit breaker has triggered and you want to shutdown your consumers to allow the system to recover
- The connection heartbeat has failed and the connection has shut itself down
The circuit breaker case is in principle recoverable, as demonstrated in your PR. However, the heartbeat failure case is not recoverable. I don't like the idea of having this mixture of recoverable and unrecoverable queue stoppages if we can avoid it.
Neither of the arguments I have given above indicate that having queues be restartable is not possible. However, I really believe that it would represent a significant increase in complexity for the RMQ implementation and a significant jump in the size of the mental model required to effectively work on RMQ internals.
That's enough negativity.
One thing that jumps out at me from this PR is that all of the consumers are removed from the queue when it is stopped and restarted. The way that consumer queues are setup, at least in all the cases that I have seen, looks like (ignoring errors):
connection, _ = mq.OpenConnectionWithRedisClient(queueTag, redisClient, rmqErrChan)
queue, _ = connection.OpenQueue(queueName)
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
queue.AddConsumer("consumerName", newConsumer(i))
}
Under this PR the code that is required the get a stopped queue running again looks like
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
queue.AddConsumer("consumerName", newConsumer(i))
}
and currently the code that is required to get a stopped queue running again looks like
queue, _ = connection.OpenQueue(queueName)
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
queue.AddConsumer("consumerName", newConsumer(i))
}
The only clear difference that I can see is that currently you need to hold onto a reference to connection
and queueName
in order to be able to get your system consuming again after the circuit breaker has fired. This seems like a very low amount of complexity. If I have misunderstood the PR or your use-case please correct me.
If we really need to have queues restart themselves, without having a reference to the queue-name or the connection, I would feel comfortable with adding a method
Recreate() (Queue, error)
This would not restart a stopped queue, but would create a new queue with the same name/connection. The recovery code in this case would look like
queue, _ = queue.Recreate()
_ := queue.StartConsuming(consumerCount, pollDuration)
for i := 0; i < consumerCount; i++ {
queue.AddConsumer("consumerName", newConsumer(i))
}
Thanks for your detail reply @fmstephe I will try to apply this on my project.
Seems like the approach described above is fine. Closing :v: