server icon indicating copy to clipboard operation
server copied to clipboard

Stale inflight messages

Open bkupidura opened this issue 2 years ago • 7 comments

I start observing increased number of inflight messages which are almost never going down. In same time i didnt notice any disconnects from clients - so looks like clients just didnt received message and they will never send ACK (network issue on client side?)

I see that there is ResendClientInflight (https://github.com/mochi-co/mqtt/blob/master/server/server.go#L878), but looks like its executed only on new client connection (send messages from persistence store to reconnecting clients?).

Is there any internal mechanism which will try to deliver inflight messages or mochi-co/mqtt user should implement this on his own?

bkupidura avatar Jul 03 '22 20:07 bkupidura

Are you using persistence? This sounds similar to https://github.com/mochi-co/mqtt/discussions/76 - typically inflights shouldn't really occur in normal use case. They certainly shouldn't be increasing indefinitely in this way.

Currently inflight messages are only resent when the client reconnects. It would be nice to have a periodic retry, however it needs a bit of thought to avoid creating unnecessary overhead.

mochi-co avatar Jul 03 '22 20:07 mochi-co

Im not using any persistence storage. Any my clients wasnt disconnected, so those inflight messages will stay there forever ;)

For now i will implement periodic ResendClientInflight on my end.

bkupidura avatar Jul 03 '22 20:07 bkupidura

That's very unusual... Can you provide some more information about your client library, use (qos, etc)? It would be good to determine the issue in case it's something which can be fixed.

mochi-co avatar Jul 03 '22 20:07 mochi-co

I didnt do much debuging yet, and not sure which messages are inflight. Will investigate further.

And to be precise, i observe couple (4-10) of inflights per week or two - so im almost sure that this is issue with consumers/networking and not mochi-co/mqtt.

bkupidura avatar Jul 03 '22 21:07 bkupidura

So i have some funny findings regarding that one. As a PoC i added:

func handleInflight(ctx context.Context, mqttServer *mqtt.Server) {
	for {
		for _, cl := range mqttServer.Clients.GetByListener("t1") {
			log.Printf("inflights messages for %s (%s) %d", cl.ID, string(cl.Username), cl.Inflight.Len())
			for _, tk := range cl.Inflight.GetAll() {
				log.Printf("topic %s (%v)", tk.Packet.TopicName, tk.Packet.Topics)
			}
			mqttServer.ResendClientInflight(cl, false)
		}
		time.Sleep(300 * time.Second)
	}
}

Just to check that running ResendClientInflight every N seconds will solve my problem. And it did.

But today, Prometheus start complaining that i have some in-flight messages.

I build prometheus metric by reading mqttServer.System.Inflight. I have only one listener called "t1".

And from handleInflight output, looks i dont have any inflights on any client, but mqttServer.System.Inflight is still showing some inflights.

I see that potentialy it can happend somewhere here https://github.com/mochi-co/mqtt/blob/27f3c484ad65bb34cd4c1de12cbb91b8be16dabd/server/server.go#L885 when packet is not Publish packet.

In my case publishDropped is not increased (still showing 0).

Any idea what im missing?

My code: https://github.com/bkupidura/broker-ha/blob/debug-inflight/server/server.go

bkupidura avatar Jul 06 '22 14:07 bkupidura

@mochi-co possibly i found root cause for those in-flight messages.

When new client subscribes with QoS 2, looks like its remembered by mochi-co/mqtt forever and every message published to subscribed topics will be stored to deliver them in future. s.Topics.Unsubscribe is called only on unsubscribe packet and from inheritClientSession. Its is not called when client is disconnected.

This will not work, as clients are identified by ClientID which can be random.

Moreover in my case, client can reconnect to different mochi-co/mqtt instance, so first instance will just store messages which will never be delivered.

For debugging i enabled bolt store to check what is there.

mosquitto_sub -h '<broker_ip>' -u <username> -P '<password>' -t 'inflight_test' -i inflight_test -q 2
<close mosquitto_sub process>
mosquitto_pub -h '<broker_ip>' -u <username> -P '<password>' -m '123' -t 'inflight_test' -q 2

Message in store

2022/08/07 19:34:52 server.go:120: inflight from store: {Payload:[49 50 51] FixedHeader:{Remaining:0 Type:3 Qos:2 Dup:false Retain:false} T:ifm ID:if_inflight_test_1 Client: TopicName:inflight_test Sent:1659893659 Resends:0 PacketID:0}

I believe that when client is disconnected, we should have possibility to remove his subscriptions - even if QoS 2 was set, as this client can never reconnect.

bkupidura avatar Aug 07 '22 17:08 bkupidura

@bkupidura This looks very interestin and I suspect you are correct. Apologies I have been very busy lately. I will do my best to look into this as a priority in depth soon 👍🏻

mochi-co avatar Aug 11 '22 19:08 mochi-co

Fixed in v1.3.0 with new TTL/period resend support for inflight messages.

@bkupidura Please give it a test and close the issue if it solves the problem :)

mochi-co avatar Aug 16 '22 21:08 mochi-co

@mochi-co thanks! I just deployed new version and looks good. Inflights are indeed cleared.

There is just small thing, after clearExpiredInflights and clearAbandonedInflights we should update s.System.Inflight. Otherwise metric is not synced with real state.

bkupidura avatar Aug 17 '22 07:08 bkupidura

@bkupidura Ahhh yes I knew I'd forgotten something. I'll add it to the list!

mochi-co avatar Aug 17 '22 07:08 mochi-co

@mochi-co if you dont mind, i can prepare PR for that.

My idea is that func (i *Inflight) ClearExpired should return number of cleared inflights, and just handle that in mqtt/server.go

bkupidura avatar Aug 17 '22 07:08 bkupidura

@bkupidura That would be wonderful! :) I think that's the most effective approach too.

mochi-co avatar Aug 17 '22 07:08 mochi-co

https://github.com/mochi-co/mqtt/pull/92

bkupidura avatar Aug 17 '22 08:08 bkupidura