alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Akka MQTT Streaming: PingFailed not generated when client is busy.

Open sbmpost opened this issue 2 years ago • 5 comments

@huntc I slightly modified the test in MqttSessionSpec to start publishing when a ping response is expected. This results in the ping timer being reset eventhough a ping response is still pending. We get the PingFailed exception only after we stop publishing (which may never happen). There may be messages other than 'publish' resulting in the same behaviour. I suggest to check all places in the file called 'ClientState', where the method 'serverConnected' calls itself with:

resetPingReqTimer = true

Here is the slightly adapted test using 'awaitCond' to demonstrate the issue:

"MQTT client connector" should {

  "disconnect a connected session if a ping request is not replied to" in assertAllStagesStopped {
    val session = ActorMqttClientSession(settings)

    val server = TestProbe()
    val pipeToServer = Flow[ByteString].mapAsync(1)(msg => server.ref.ask(msg).mapTo[ByteString])

    val (client, result) =
      Source
        .queue(1, OverflowStrategy.fail)
        .via(
          Mqtt
            .clientSessionFlow(session, ByteString("1"))
            .join(pipeToServer)
        )
        .toMat(Sink.ignore)(Keep.both)
        .run()

    val connect = Connect("some-client-id", ConnectFlags.None).copy(keepAlive = 100.millis.dilated)
    val connectBytes = connect.encode(ByteString.newBuilder).result()
    val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
    val connAckBytes = connAck.encode(ByteString.newBuilder).result()
    val pingReq = PingReq
    val pingReqBytes = pingReq.encode(ByteString.newBuilder).result()

    client.offer(Command(connect))

    server.expectMsg(connectBytes)
    server.reply(connAckBytes)

    server.expectMsg(pingReqBytes)

    awaitCond {
      if (result.isCompleted) {
        val done: scala.util.Try[Done] = Await.ready(result, Duration.Zero).value.get
        done match {
          case Success(_) =>
            false
          case Failure(throwable) =>
            if (throwable.isInstanceOf[ActorMqttClientSession.PingFailed.type]) {
              true
            } else {
              false
            }
        }
      } else {
        // Comment out the line below to make the test succeed
        session ! Command(Publish("Keep Client Busy", ByteString()))
        false
      }
    }

    client.complete()
    client.watchCompletion().foreach(_ => session.shutdown())
  }

}

sbmpost avatar Sep 14 '22 14:09 sbmpost

@sbmpost FYI I'm no longer involved with this project. Thanks for the notification.

huntc avatar Sep 15 '22 07:09 huntc

@ennru Also notifying you then ;-)

sbmpost avatar Sep 15 '22 14:09 sbmpost

Thank you for your investigation of the ping. Most of my MQTT protocol knowledge has been evicted. Would you be in a position to suggest an improvement to this in a PR?

ennru avatar Sep 20 '22 07:09 ennru

@ennru I am afraid not at the moment. I am very time limited mostly due to personal circumstances. If this situation changes, I might be able to give it a shot.

sbmpost avatar Sep 20 '22 17:09 sbmpost

Colleague from @sbmpost speaking here. We did discuss this in our team but with the recent license changes, we can't commit company resources to this. While we gladly contribute to projects that our business is based on, we can't spend the resources on bugfixes in software, that we must then license to use.

KevinAtSesam avatar Oct 28 '22 08:10 KevinAtSesam