alpakka
alpakka copied to clipboard
Akka MQTT Streaming: PingFailed not generated when client is busy.
@huntc I slightly modified the test in MqttSessionSpec to start publishing when a ping response is expected. This results in the ping timer being reset eventhough a ping response is still pending. We get the PingFailed exception only after we stop publishing (which may never happen). There may be messages other than 'publish' resulting in the same behaviour. I suggest to check all places in the file called 'ClientState', where the method 'serverConnected' calls itself with:
resetPingReqTimer = true
Here is the slightly adapted test using 'awaitCond' to demonstrate the issue:
"MQTT client connector" should {
"disconnect a connected session if a ping request is not replied to" in assertAllStagesStopped {
val session = ActorMqttClientSession(settings)
val server = TestProbe()
val pipeToServer = Flow[ByteString].mapAsync(1)(msg => server.ref.ask(msg).mapTo[ByteString])
val (client, result) =
Source
.queue(1, OverflowStrategy.fail)
.via(
Mqtt
.clientSessionFlow(session, ByteString("1"))
.join(pipeToServer)
)
.toMat(Sink.ignore)(Keep.both)
.run()
val connect = Connect("some-client-id", ConnectFlags.None).copy(keepAlive = 100.millis.dilated)
val connectBytes = connect.encode(ByteString.newBuilder).result()
val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val pingReq = PingReq
val pingReqBytes = pingReq.encode(ByteString.newBuilder).result()
client.offer(Command(connect))
server.expectMsg(connectBytes)
server.reply(connAckBytes)
server.expectMsg(pingReqBytes)
awaitCond {
if (result.isCompleted) {
val done: scala.util.Try[Done] = Await.ready(result, Duration.Zero).value.get
done match {
case Success(_) =>
false
case Failure(throwable) =>
if (throwable.isInstanceOf[ActorMqttClientSession.PingFailed.type]) {
true
} else {
false
}
}
} else {
// Comment out the line below to make the test succeed
session ! Command(Publish("Keep Client Busy", ByteString()))
false
}
}
client.complete()
client.watchCompletion().foreach(_ => session.shutdown())
}
}
@sbmpost FYI I'm no longer involved with this project. Thanks for the notification.
@ennru Also notifying you then ;-)
Thank you for your investigation of the ping. Most of my MQTT protocol knowledge has been evicted. Would you be in a position to suggest an improvement to this in a PR?
@ennru I am afraid not at the moment. I am very time limited mostly due to personal circumstances. If this situation changes, I might be able to give it a shot.
Colleague from @sbmpost speaking here. We did discuss this in our team but with the recent license changes, we can't commit company resources to this. While we gladly contribute to projects that our business is based on, we can't spend the resources on bugfixes in software, that we must then license to use.