server
server copied to clipboard
Can not get onPublish called after set Inline Client
Hi, sorry for bother again, I have some miss usage of this lib, but this problem I can not figure out why:
type CustomHook struct {
mqtt.HookBase
config *CustomHookOptions
}
type CustomHookOptions struct {
Server *mqtt.Server
}
func (h *CustomHook) ID() string {
return "my-hook"
}
func (h *CustomHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnDisconnect,
mqtt.OnSubscribed,
mqtt.OnUnsubscribed,
mqtt.OnPublished,
mqtt.OnPublish,
}, []byte{b})
}
func (h *CustomHook) Init(config any) error {
if _, ok := config.(*CustomHookOptions); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}
if config == nil {
config = new(CustomHookOptions)
}
h.config = config.(*CustomHookOptions)
h.Log.Info("initialised")
return nil
}
func (h *CustomHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
h.Log.Info("--> client connected [auth]", "client", cl.ID)
res := MyAuthorizeHandler(cl, pk, h.config.Server)
h.Log.Info("--> client auth result: ", res)
return res
}
func (h *CustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
h.Log.Info("--> client connected", "client", cl.ID)
return nil
}
func (h *CustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
if err != nil {
h.Log.Info("--> client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
}
}
func (h *CustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}
func (h *CustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}
func (h *CustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
if cl.ID != "inline" {
h.Log.Info("--> received from client", "client", cl.ID, "payload", string(pk.Payload))
GlobalInterceptors(cl, pk, h.config.Server)
} else {
h.Log.Info("--> received from inline client")
}
return pk, nil
}
func (h *CustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
if cl.ID != "inline" {
h.Log.Info("--> published to client", "client", cl.ID, "payload", string(pk.Payload))
} else {
h.Log.Info("--> received from inline client")
}
}
This is my Hook, basically, I have a connect auth, and then send the messages needed to client. After that, just do normal message delivery.
But now, the onPublish
and onPublished
seems can not being called except the inline client.
Why
@lucasjinreal I tested the issue you mentioned, and it seems there is no problem. I used a client to publish a message, and then it triggers an inline publish message. Below is my code and the printed results.
Is there any special handling in your 'GlobalInterceptors(cl, pk, h.config.Server)' here? If possible, could you provide the code snippet for 'GlobalInterceptors'?
package main
import (
"bytes"
"fmt"
"log"
"os"
"os/signal"
"syscall"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
server := mqtt.New(&mqtt.Options{
InlineClient: true, // you must enable inline client to use direct publishing and subscribing.
})
_ = server.AddHook(new(auth.AllowHook), nil)
tcp := listeners.NewTCP("t1", ":1883", nil)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
// Add custom hook (ExampleHook) to the server
err = server.AddHook(new(ExampleHook), &ExampleHookOptions{
Server: server,
})
if err != nil {
log.Fatal(err)
}
// Start the server
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
<-done
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
// Options contains configuration settings for the hook.
type ExampleHookOptions struct {
Server *mqtt.Server
}
type ExampleHook struct {
mqtt.HookBase
config *ExampleHookOptions
}
func (h *ExampleHook) ID() string {
return "events-example"
}
func (h *ExampleHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnect,
mqtt.OnDisconnect,
mqtt.OnSubscribed,
mqtt.OnUnsubscribed,
mqtt.OnPublished,
mqtt.OnPublish,
}, []byte{b})
}
func (h *ExampleHook) Init(config any) error {
h.Log.Info("initialised")
if _, ok := config.(*ExampleHookOptions); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}
h.config = config.(*ExampleHookOptions)
if h.config.Server == nil {
return mqtt.ErrInvalidConfigType
}
return nil
}
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
h.Log.Info("client connected", "client", cl.ID)
return nil
}
func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
if err != nil {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
}
}
func (h *ExampleHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}
func (h *ExampleHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}
func (h *ExampleHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
if cl.ID != "inline" {
h.Log.Info("--> OnPublish from normal client", "client", cl.ID, "payload", string(pk.Payload))
h.config.Server.Publish("/example/inline", []byte("xxxxx"), false, 0)
} else {
h.Log.Info("--> OnPublish from inline client", "client", cl.ID, "payload", string(pk.Payload))
}
return pk, nil
}
func (h *ExampleHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
if cl.ID != "inline" {
h.Log.Info("--> OnPublished from normal client", "client", cl.ID, "payload", string(pk.Payload))
} else {
h.Log.Info("--> OnPublished from inline client", "client", cl.ID, "payload", string(pk.Payload))
}
}
time=2023-11-25T14:14:06.062+08:00 level=INFO msg="added hook" hook=allow-all-auth
time=2023-11-25T14:14:06.103+08:00 level=INFO msg="attached listener" id=t1 protocol=tcp address=:1883
time=2023-11-25T14:14:06.103+08:00 level=INFO msg="added hook" hook=events-example
time=2023-11-25T14:14:06.104+08:00 level=INFO msg=initialised hook=events-example
time=2023-11-25T14:14:06.104+08:00 level=INFO msg="mochi mqtt starting" version=2.4.1
time=2023-11-25T14:14:06.106+08:00 level=INFO msg="mochi mqtt server started"
time=2023-11-25T14:14:07.405+08:00 level=INFO msg="client connected" hook=events-example client=mqttx_2d6e6a18
time=2023-11-25T14:14:23.109+08:00 level=INFO msg="--> OnPublish from normal client" hook=events-example client=mqttx_2d6e6a18 payload=aaa
time=2023-11-25T14:14:23.110+08:00 level=INFO msg="--> OnPublish from inline client" hook=events-example client=inline payload=xxxxx
time=2023-11-25T14:14:23.110+08:00 level=INFO msg="--> OnPublished from inline client" hook=events-example client=inline payload=xxxxx
time=2023-11-25T14:14:23.111+08:00 level=INFO msg="--> OnPublished from normal client" hook=events-example client=mqttx_2d6e6a18 payload=aaa
thank u, I found it was because I make authHook and normalHook to a single HOOK, but now fixed.
Still, I got one more question.
I found when I using qos2 to pushing message to a topic. the client will receiving it many times.
I am using auto reconnect && keepAlive=120 for client, and resume the old sessions for same clientID.
Is that normal?
thank u, I found it was because I make authHook and normalHook to a single HOOK, but now fixed.
Still, I got one more question.
I found when I using qos2 to pushing message to a topic. the client will receiving it many times.
I am using auto reconnect && keepAlive=120 for client, and resume the old sessions for same clientID.
Is that normal?
This issue has never occurred in my environment. Did you publish a retained message? If the published message is retained, the subscription will receive the retained message again after the client reconnects.
@werbenhu yes, all message send from inline client, are retained.
So, if I don't want the message resent when re-connect, retain should be false?
when will retain be used in real production scenario?
@lucasjinreal About retained message, you can refer to MQTT spec 3.3.1.3 RETAIN.
Retained message can be delivered to future subscribers whose subscriptions match its Topic Name.
Does retain message will clear when session outdated?
I am using mqtt as a chat service, if I want different client can have also receive the message, does make all chat message retain is suitable or not?
Looks like retain might consuming many memory on server side since it needs to store messages/
Does retain message will clear when session outdated?
It doesn't. The retained message is associated with the topic; each topic can have only one retained message. Any client can set a retained message for the same topic.
I am using mqtt as a chat service, if I want different client can have also receive the message, does make all chat message retain is suitable or not?
I'm not very familiar with the specifics of your chat service use case, so I can't provide a definitive answer. From my current understanding, it may not be suitable.
@lucasjinreal Has there been any progress on this issue? Can it be closed now?
Yes, but I still struggle on a new issue:
if set client to re-connect auto true, when the network is poor, it will reconnect endlessly and make the UI stuck, do u got any possible solution to this issue?
Closing due to inactivity - @lucasjinreal if you are still have questions, please open a new issue - thank you!