server icon indicating copy to clipboard operation
server copied to clipboard

Can not get onPublish called after set Inline Client

Open lucasjinreal opened this issue 7 months ago • 9 comments

Hi, sorry for bother again, I have some miss usage of this lib, but this problem I can not figure out why:

type CustomHook struct {
	mqtt.HookBase
	config *CustomHookOptions
}

type CustomHookOptions struct {
	Server *mqtt.Server
}

func (h *CustomHook) ID() string {
	return "my-hook"
}

func (h *CustomHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnectAuthenticate,
		mqtt.OnDisconnect,
		mqtt.OnSubscribed,
		mqtt.OnUnsubscribed,
		mqtt.OnPublished,
		mqtt.OnPublish,
	}, []byte{b})
}

func (h *CustomHook) Init(config any) error {
	if _, ok := config.(*CustomHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	if config == nil {
		config = new(CustomHookOptions)
	}

	h.config = config.(*CustomHookOptions)
	h.Log.Info("initialised")
	return nil
}

func (h *CustomHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	h.Log.Info("--> client connected [auth]", "client", cl.ID)
	res := MyAuthorizeHandler(cl, pk, h.config.Server)
	h.Log.Info("--> client auth result: ", res)
	return res
}

func (h *CustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
	h.Log.Info("--> client connected", "client", cl.ID)
	return nil
}

func (h *CustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
	if err != nil {
		h.Log.Info("--> client disconnected", "client", cl.ID, "expire", expire, "error", err)
	} else {
		h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
	}

}

func (h *CustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
	h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}

func (h *CustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
	h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}

func (h *CustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	if cl.ID != "inline" {
		h.Log.Info("--> received from client", "client", cl.ID, "payload", string(pk.Payload))
		GlobalInterceptors(cl, pk, h.config.Server)
	} else {
		h.Log.Info("--> received from inline client")
	}
	return pk, nil
}

func (h *CustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
	if cl.ID != "inline" {
		h.Log.Info("--> published to client", "client", cl.ID, "payload", string(pk.Payload))
	} else {
		h.Log.Info("--> received from inline client")
	}
}

This is my Hook, basically, I have a connect auth, and then send the messages needed to client. After that, just do normal message delivery.

But now, the onPublish and onPublished seems can not being called except the inline client.

Why

lucasjinreal avatar Nov 25 '23 03:11 lucasjinreal

@lucasjinreal I tested the issue you mentioned, and it seems there is no problem. I used a client to publish a message, and then it triggers an inline publish message. Below is my code and the printed results.

Is there any special handling in your 'GlobalInterceptors(cl, pk, h.config.Server)' here? If possible, could you provide the code snippet for 'GlobalInterceptors'?

package main

import (
	"bytes"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/hooks/auth"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

func main() {
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New(&mqtt.Options{
		InlineClient: true, // you must enable inline client to use direct publishing and subscribing.
	})

	_ = server.AddHook(new(auth.AllowHook), nil)
	tcp := listeners.NewTCP("t1", ":1883", nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	// Add custom hook (ExampleHook) to the server
	err = server.AddHook(new(ExampleHook), &ExampleHookOptions{
		Server: server,
	})

	if err != nil {
		log.Fatal(err)
	}

	// Start the server
	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-done
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")
}

// Options contains configuration settings for the hook.
type ExampleHookOptions struct {
	Server *mqtt.Server
}

type ExampleHook struct {
	mqtt.HookBase
	config *ExampleHookOptions
}

func (h *ExampleHook) ID() string {
	return "events-example"
}

func (h *ExampleHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnect,
		mqtt.OnDisconnect,
		mqtt.OnSubscribed,
		mqtt.OnUnsubscribed,
		mqtt.OnPublished,
		mqtt.OnPublish,
	}, []byte{b})
}

func (h *ExampleHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*ExampleHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*ExampleHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
	h.Log.Info("client connected", "client", cl.ID)
	return nil
}

func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
	if err != nil {
		h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
	} else {
		h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
	}

}

func (h *ExampleHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
	h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
	h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	if cl.ID != "inline" {
		h.Log.Info("--> OnPublish from normal client", "client", cl.ID, "payload", string(pk.Payload))
		h.config.Server.Publish("/example/inline", []byte("xxxxx"), false, 0)
	} else {
		h.Log.Info("--> OnPublish from inline client", "client", cl.ID, "payload", string(pk.Payload))
	}
	return pk, nil
}

func (h *ExampleHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
	if cl.ID != "inline" {
		h.Log.Info("--> OnPublished from normal client", "client", cl.ID, "payload", string(pk.Payload))
	} else {
		h.Log.Info("--> OnPublished from inline client", "client", cl.ID, "payload", string(pk.Payload))
	}
}
time=2023-11-25T14:14:06.062+08:00 level=INFO msg="added hook" hook=allow-all-auth
time=2023-11-25T14:14:06.103+08:00 level=INFO msg="attached listener" id=t1 protocol=tcp address=:1883
time=2023-11-25T14:14:06.103+08:00 level=INFO msg="added hook" hook=events-example
time=2023-11-25T14:14:06.104+08:00 level=INFO msg=initialised hook=events-example
time=2023-11-25T14:14:06.104+08:00 level=INFO msg="mochi mqtt starting" version=2.4.1
time=2023-11-25T14:14:06.106+08:00 level=INFO msg="mochi mqtt server started"
time=2023-11-25T14:14:07.405+08:00 level=INFO msg="client connected" hook=events-example client=mqttx_2d6e6a18
time=2023-11-25T14:14:23.109+08:00 level=INFO msg="--> OnPublish from normal client" hook=events-example client=mqttx_2d6e6a18 payload=aaa
time=2023-11-25T14:14:23.110+08:00 level=INFO msg="--> OnPublish from inline client" hook=events-example client=inline payload=xxxxx
time=2023-11-25T14:14:23.110+08:00 level=INFO msg="--> OnPublished from inline client" hook=events-example client=inline payload=xxxxx
time=2023-11-25T14:14:23.111+08:00 level=INFO msg="--> OnPublished from normal client" hook=events-example client=mqttx_2d6e6a18 payload=aaa

werbenhu avatar Nov 25 '23 14:11 werbenhu

thank u, I found it was because I make authHook and normalHook to a single HOOK, but now fixed.

Still, I got one more question.

I found when I using qos2 to pushing message to a topic. the client will receiving it many times.

I am using auto reconnect && keepAlive=120 for client, and resume the old sessions for same clientID.

Is that normal?

lucasjinreal avatar Nov 26 '23 07:11 lucasjinreal

thank u, I found it was because I make authHook and normalHook to a single HOOK, but now fixed.

Still, I got one more question.

I found when I using qos2 to pushing message to a topic. the client will receiving it many times.

I am using auto reconnect && keepAlive=120 for client, and resume the old sessions for same clientID.

Is that normal?

This issue has never occurred in my environment. Did you publish a retained message? If the published message is retained, the subscription will receive the retained message again after the client reconnects.

werbenhu avatar Nov 26 '23 14:11 werbenhu

@werbenhu yes, all message send from inline client, are retained.

So, if I don't want the message resent when re-connect, retain should be false?

when will retain be used in real production scenario?

lucasjinreal avatar Nov 27 '23 02:11 lucasjinreal

@lucasjinreal About retained message, you can refer to MQTT spec 3.3.1.3 RETAIN.

Retained message can be delivered to future subscribers whose subscriptions match its Topic Name.

werbenhu avatar Nov 27 '23 02:11 werbenhu

Does retain message will clear when session outdated?

I am using mqtt as a chat service, if I want different client can have also receive the message, does make all chat message retain is suitable or not?

Looks like retain might consuming many memory on server side since it needs to store messages/

lucasjinreal avatar Nov 27 '23 02:11 lucasjinreal

Does retain message will clear when session outdated?

It doesn't. The retained message is associated with the topic; each topic can have only one retained message. Any client can set a retained message for the same topic.

I am using mqtt as a chat service, if I want different client can have also receive the message, does make all chat message retain is suitable or not?

I'm not very familiar with the specifics of your chat service use case, so I can't provide a definitive answer. From my current understanding, it may not be suitable.

werbenhu avatar Nov 27 '23 06:11 werbenhu

@lucasjinreal Has there been any progress on this issue? Can it be closed now?

werbenhu avatar Dec 11 '23 06:12 werbenhu

Yes, but I still struggle on a new issue:

if set client to re-connect auto true, when the network is poor, it will reconnect endlessly and make the UI stuck, do u got any possible solution to this issue?

lucasjinreal avatar Dec 11 '23 08:12 lucasjinreal

Closing due to inactivity - @lucasjinreal if you are still have questions, please open a new issue - thank you!

mochi-co avatar Mar 18 '24 04:03 mochi-co