akka-rabbitmq icon indicating copy to clipboard operation
akka-rabbitmq copied to clipboard

Pausing / Resuming consumer

Open goral09 opened this issue 7 years ago • 6 comments

Hi,

Is it possible to pause|resume consuming ? According to documentation of rabbitmq it can be achieved with channel.basicCancel | channel.basicConsume but I can't see it available in this lib.

Cheers.

goral09 avatar May 18 '17 13:05 goral09

This library only provides connection and channel management. Consuming and publishing you should still do yourself using RabbitMQ's API, and I don't see a reason why you could also cancel or resume consuming using that.

For a higher-level Akka-based interface to AMQP, see op-rabbit. Disclaimer: I'm no expert and I don't know if they support pause/resume.

reinierl avatar May 18 '17 20:05 reinierl

@reinierl Thanks for the reply.

I am actually using op-rabbit but they support pausing/resuming of all consumers subscribed using provided ConnectionActor so it's not exactly what I'm after.

goral09 avatar May 19 '17 07:05 goral09

Also, how can I get a hold of underlying channel to pause|resume consumption?

goral09 avatar May 19 '17 08:05 goral09

By sending a ChannelMessage to the channel actor, e.g.:

    def setupChannel(channel: Channel, self: ActorRef) {
      channel.queueDeclare("queue_name", false, false, false, null)
    }
    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))

    def publish(channel: Channel) {
      channel.basicPublish("", "queue_name", null, "Hello world".getBytes)
    }
    channelActor ! ChannelMessage(publish)

reinierl avatar May 20 '17 20:05 reinierl

@reinierl Thanks for all your involvement in the thread. I had tried that approach but couldn't get a hold of consumerTag for the current consumer. What I managed to do was call getDefaultConsumer.asInstanceOf[DefaultConsumer].getConsumerTag() but this threw NullPointerException.

goral09 avatar May 22 '17 14:05 goral09

Hmm, yes, I see. I suppose you call basicConsume in the channel setup closure, and then you get a consumer tag. But then there is no nice way to pass it back from the closure to the code that sent the ChannelMessage in the first place... You could make the closure use ! again, something like:

    def setupChannel(channel: Channel, self: ActorRef) {
      val tag = channel.basicConsume(...)
      self ! ConsumptionStarted(tag)
    }
    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))

but then the consumer tag in your code may get out of sync with what's happening in the ChannelActor.

To keep around client-side state like this consumer tag, I suppose we could make the API more complicated so that ChannelMessage takes a ChannelState => Channel => (ChannelState, Any) instead of a Channel => Any like now. And ChannelState is really just whatever you need: case class ChannelState(data: Any). Then you could make the ChannelActor keep track of the consumer tag for you. Let me think about that a bit...

reinierl avatar May 22 '17 15:05 reinierl