akka-rabbitmq icon indicating copy to clipboard operation
akka-rabbitmq copied to clipboard

dropIfNoChannel problem

Open RashadAnsari opened this issue 5 years ago • 3 comments

Hi When I send many ChannelMessage with a high rate to ChannelActor with dropIfNoChannel = false if I stop rabbitmq and after a duration of time like 1 minute start rabbitmq again, the ChannelActor blocked for process many messages and can't process other message and throw heap space error. I write this code:

import akka.actor.{ActorRef, ActorSystem}
import akka.pattern._
import akka.util.Timeout
import com.newmotion.akka.rabbitmq.{ChannelActor, ChannelCreated, ChannelMessage, ConnectionActor, ConnectionFactory, CreateChannel}
import com.rabbitmq.client.MessageProperties

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Exam1 extends App {

  val system = ActorSystem()

  implicit val timeout: Timeout = Timeout(2 second)
  implicit val executionContext: ExecutionContext = system.dispatcher

  val unconfirmed = mutable.Set.empty[Long]

  val connFactory = new ConnectionFactory()
  connFactory.setHost("127.0.0.1")
  connFactory.setPort(5672)
  connFactory.setUsername("amqp")
  connFactory.setPassword("pass")

  val pubConnActor = system.actorOf(ConnectionActor.props(connFactory))

  Thread.sleep(5000)

  @tailrec
  def produce(chActor: ActorRef): Unit = {
    chActor ! ChannelMessage({
      ch =>
        println("------------------------")
        ch.basicPublish("amq.direct", "me-topic", MessageProperties.PERSISTENT_BASIC, "message".getBytes("UTF-8"))
    }, dropIfNoChannel = false)
    produce(chActor)
  }

  (pubConnActor ? CreateChannel(ChannelActor.props().withMailbox("bounded-mailbox"))).mapTo[ChannelCreated] map {
    case ChannelCreated(chActor) =>
      produce(chActor)
  }

}

When I run my code and after a duration of time stop rabbitmq and after a duration of time start again I get heap space error and stop my program. I think this is for loop function in ChannelActor. Please fix it and choose another way.

RashadAnsari avatar Sep 17 '18 12:09 RashadAnsari

If rabbitmq goes offline for a short moment of time (small network glitch for instance), the fallback mechanism is to queue messages in memory for as long as it is offline. But there are limits to the number of messages that can be queued. Is this why you are using a bounded-mailbox?

Note: I am not the maintainer of this library, but more curious about the behaviour you are describing

sbmpost avatar Oct 02 '18 13:10 sbmpost

Thank you. I just use bounded-mailbox for control message rate. When I send many messages to channel actor if I don't use a bounded-mailbox, channel actor crash.

RashadAnsari avatar Oct 10 '18 19:10 RashadAnsari

A version 5.0.4-beta has been released. While the changes aren't directly related to your problem, it might change the behaviour you are seeing. But as mentioned before, akka-rabbitmq can only queue messages for a limited period of time (that is until the heap space runs out). If the connection to the server is restored within that time, then you should be fine.

sbmpost avatar Dec 01 '18 11:12 sbmpost