akka-rabbitmq icon indicating copy to clipboard operation
akka-rabbitmq copied to clipboard

Channel Actor not Stopping

Open pietervanlillsi opened this issue 4 years ago • 1 comments

[ERROR] [12/11/2019 12:08:55.034] [fixture-cluster-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/user/mq-connection/{queueName}] close com.rabbitmq.client.AlreadyClosedException: chan
nel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue '{queueName}' in vhost '{vhost}', class-id=50, method-id=20)

I have an interesting flow that I am trying to implement.

Our business requires using one connection per VM to connect to RabbitMQ. What I have Implemented is the following flow.

Call our API -> GET queueLink/name -> have a single connection actor shared on the actorsystem -> spawn an actor in the cluster to handle the data events -> spawn a MQSubscriber for the data actor -> Connect the Subscriber which Heartbeats between Data event and Subscriber -> On connection failure, suspend data events and wait for reconnect -> on reconnect obtain new queueLink/name and use the MQSubscriber to connect.

The issue I am having is the old channel actor is not dying when the connection drops and I cannot find a way to kill it. I do not care about keeping it around since our queueNames are generated dynamically. I need a way to kill the channel actor when the connection drops.

class MQSubscriber(receiver: ActorRef)(implicit val actorSystem: ActorSystem) extends Actor with ActorSystemLogging {
  private val exchange                                    = "amq.fanout"
  implicit val executionContext: ExecutionContextExecutor = context.dispatcher
  import context.become

  private def setupSubscriber(channel: Channel, self: ActorRef, queueName: String) {
    channel.queueBind(queueName, exchange, "")

    def fromBytes(x: Array[Byte]) = new String(x)

    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        receiver ! MQMessage(envelope.getDeliveryTag.toString, fromBytes(body))
        channel.basicAck(envelope.getDeliveryTag, false)
        super.handleDelivery(consumerTag, envelope, properties, body)
      }

      override def handleCancel(consumerTag: String): Unit = {
        receiver ! MQEvents.UnexpectedDisconnect
        super.handleCancel(consumerTag)
      }

      override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = {
        receiver ! MQEvents.UnexpectedDisconnect
        super.handleShutdownSignal(consumerTag, sig)
      }
    }
    channel.basicConsume(queueName, false, consumer)
    receiver ! MQEvents.Connected
  }

  def checkHeartBeat(connection: Option[ActorRef], heartbeatTime: Int)(implicit ex: ExecutionContext): Future[MQResponse] = {
    implicit val timeout: Timeout = Timeout(100.milliseconds)
    scheduleHeartBeat(heartbeatTime)
    connection match {
      case None =>
        log.error(s"No Connection found yet")
        Future.successful(MQEvents.NoConnection)
      case Some(connection) =>
        (connection ? ConnectionActor.GetState)
          .map {
            case ConnectionActor.Disconnected =>
              log.error(s"Connection $connection is Disconnected")
              MQEvents.Disconnected
            case ConnectionActor.Connected => MQEvents.HeartBeat
          }
          .recover {
            case _: AskTimeoutException | _: ActorNotFound =>
              log.error(s"Connection ${connection.path.name} Actor Timeout or not found!!!")
              MQEvents.Lost
          }
    }
  }

  def scheduleHeartBeat(heartbeatTime: Int): Cancellable =
    context.system.scheduler.scheduleOnce(FiniteDuration(heartbeatTime, SECONDS), self, MQEvents.CheckHeartBeat)

  override def receive: Receive = process(None, 1, "")

  def process(connection: Option[ActorRef], heartbeatTime: Int, queueName: String): Receive = {
    case MQSubscriber.Start(queueName, connection) =>
      connection ! CreateChannel(ChannelActor.props((channel, actor) => {
        setupSubscriber(channel, actor, queueName)
      }), Some(queueName))
      become(process(Option(connection), 1, queueName))
      scheduleHeartBeat(heartbeatTime)
    case MQSubscriber.Stop       => self ! PoisonPill
    case MQEvents.CheckHeartBeat => checkHeartBeat(connection, heartbeatTime).map(receiver ! _)
    case MQEvents.StartCircuitBreaker =>
      become(process(connection, 2, queueName))
    case MQEvents.CloseCircuitBreaker =>
      become(process(connection, 1, queueName))
    case _ =>
  }
}

object MQSubscriber {
  case class Start(queueName: String, connection: ActorRef)
  case object Stop
}

pietervanlillsi avatar Dec 11 '19 12:12 pietervanlillsi

This is by design. If the connection breaks, the channel actors are supposed to stay alive so that after the library internally reconnects, those channel actors can be supplied with newly acquired channels which enables those actors to push out any messages that were queued while in disconnected state. In this way connection drops are handled transparently by the library.

sbmpost avatar Dec 19 '19 16:12 sbmpost