akka-rabbitmq
akka-rabbitmq copied to clipboard
Pausing / Resuming consumer
Hi,
Is it possible to pause|resume consuming ?
According to documentation of rabbitmq it can be achieved with channel.basicCancel | channel.basicConsume
but I can't see it available in this lib.
Cheers.
This library only provides connection and channel management. Consuming and publishing you should still do yourself using RabbitMQ's API, and I don't see a reason why you could also cancel or resume consuming using that.
For a higher-level Akka-based interface to AMQP, see op-rabbit. Disclaimer: I'm no expert and I don't know if they support pause/resume.
@reinierl Thanks for the reply.
I am actually using op-rabbit but they support pausing/resuming of all consumers subscribed using provided ConnectionActor
so it's not exactly what I'm after.
Also, how can I get a hold of underlying channel
to pause|resume
consumption?
By sending a ChannelMessage
to the channel actor, e.g.:
def setupChannel(channel: Channel, self: ActorRef) {
channel.queueDeclare("queue_name", false, false, false, null)
}
val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))
def publish(channel: Channel) {
channel.basicPublish("", "queue_name", null, "Hello world".getBytes)
}
channelActor ! ChannelMessage(publish)
@reinierl Thanks for all your involvement in the thread. I had tried that approach but couldn't get a hold of consumerTag
for the current consumer. What I managed to do was call getDefaultConsumer.asInstanceOf[DefaultConsumer].getConsumerTag()
but this threw NullPointerException
.
Hmm, yes, I see. I suppose you call basicConsume
in the channel setup closure, and then you get a consumer tag. But then there is no nice way to pass it back from the closure to the code that sent the ChannelMessage
in the first place... You could make the closure use !
again, something like:
def setupChannel(channel: Channel, self: ActorRef) {
val tag = channel.basicConsume(...)
self ! ConsumptionStarted(tag)
}
val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))
but then the consumer tag in your code may get out of sync with what's happening in the ChannelActor.
To keep around client-side state like this consumer tag, I suppose we could make the API more complicated so that ChannelMessage takes a ChannelState => Channel => (ChannelState, Any)
instead of a Channel => Any
like now. And ChannelState
is really just whatever you need: case class ChannelState(data: Any)
. Then you could make the ChannelActor keep track of the consumer tag for you. Let me think about that a bit...