mosca
mosca copied to clipboard
Clients get repeated messages on endless loop
Hi,
I'm using lua-mosquitto and node mqtt as a client to my Mosca broker with kafka backend. But I get repeated message like below:
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k sub|blahblah9
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k sub|blahblah9
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k sub|blahblah9
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k sub|blahblah9
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k sub|blahblah9
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello2
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k hellohello3
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
0 /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k
Client.lua:
mqtt = require("mosquitto")
client = mqtt.new()
client.ON_CONNECT = function()
print("5b8bc0aa6be58c05415cbc44 connected")
client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k")
end
client.ON_MESSAGE = function(mid, topic, payload)
print(mid, topic, payload)
-- act, ext = payload:match("([^,]+)|([^,]+)")
-- print(act, ext)
-- if act == "sub" then client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b8b26dd2ac7176a44765c0c/k/"..ext) end
end
client.ON_PUBLISH = function()
-- client:disconnect()
end
client:connect("localhost", 1883)
--client:connect("test.mosquitto.org", 1883) -- It works well with mosquitto.org.
client:loop_forever()
Node MQtt Client:
var mqtt = require('mqtt');
var options = {
port: 1883,
host: 'mqtt://localhost',
keepalive: 60,
reconnectPeriod: 1000,
protocolId: 'MQIsdp',
protocolVersion: 3,
clean: true,
encoding: 'utf8'
};
var Client = mqtt.connect('mqtt://localhost', options);
Client.on('connect', () => {
console.log("Client Connected");
//Client.subscribe("_c_5b8b215c855b7569eb94f0ed_d_5b92665fce3f081648ee86c9_k");
Client.subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k");
});
Client.on('message', (p, a) => {
console.log(p, a);
});
Mosca Broker.js
var mosca = require('mosca');
var crypto = require('crypto');
var fs = require('fs');
var backend = {
type: "kafka",
json: false,
connectionString: "localhost:2181",
clientId: "mosca",
groupId: "mosca",
defaultEncoding: "utf8",
};
var moscaSettings = {
interfaces: [
{ type: "mqtt", port: 1883 },
],
id: "mosca",
stats: false,
publishNewClient: false,
publishClientDisconnect: false,
publishSubscriptions: false,
backend: backend,
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
function setup() {
console.log('Mosca server is up and running.');
}
server.on("error", function (err) {
console.log(err);
});
server.on('clientConnected', function (client) {
console.log('Client Connected \t:= ', client.id);
});
server.on('published', function (packet, client) {
packet.payload = packet.payload.toString('utf8');
console.log("Published :=", packet);
});
server.on('subscribed', function (topic, client) {
console.log("Subscribed :=", topic, client.packet);
});
server.on('unsubscribed', function (topic, client) {
console.log('unsubscribed := ', topic);
});
server.on('clientDisconnecting', function (client) {
console.log('clientDisconnecting := ', client.id);
});
server.on('clientDisconnected', function (client) {
console.log('Client Disconnected := ', client.id);
});
Any ideas on how to solve this?
Hi @mcollina, did you have any chance to look at this?
@0xmtn did you have any luck with this? Having the same experience. Not sure if it's a Kafka setting or Mosca though. @mcollina any thoughts?
@0xmtn @User1m meet the similar problem, could you give me some suggestion as to repeat the problem? I want to go deep into why this would happen. Thx!
@grath10 I ended up giving up on this lib but before that, I found a fork that was more actively maintained. if you want to continue w/ this lib then use the following instead: https://github.com/ConduitVC/ascoltatori https://github.com/ConduitVC/mosca hope that helps.
If it's of any help I talk about what I ended up going with here: https://github.com/User1m/kafka-pub-sub-investigation
@User1m Thx again for sharing your practice with me. By the way, could you provide the substitute one for me to refer to? Any suggestion is appreciated!
@grath10 I went with just kafka-node