mosca icon indicating copy to clipboard operation
mosca copied to clipboard

Clients get repeated messages on endless loop

Open 0xmtn opened this issue 6 years ago • 6 comments

Hi,

I'm using lua-mosquitto and node mqtt as a client to my Mosca broker with kafka backend. But I get repeated message like below:

0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	sub|blahblah9
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	sub|blahblah9
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	sub|blahblah9
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	sub|blahblah9
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	sub|blahblah9
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello2
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	hellohello3
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k	
0	/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k

Client.lua:

mqtt = require("mosquitto")
client = mqtt.new()

client.ON_CONNECT = function()
    print("5b8bc0aa6be58c05415cbc44 connected")
    client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k")
end

client.ON_MESSAGE = function(mid, topic, payload)
    print(mid, topic, payload)
    -- act, ext = payload:match("([^,]+)|([^,]+)")
    -- print(act, ext)
    -- if act == "sub" then client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b8b26dd2ac7176a44765c0c/k/"..ext) end
end

client.ON_PUBLISH = function()
    -- client:disconnect()
end

client:connect("localhost", 1883)
--client:connect("test.mosquitto.org", 1883) -- It works well with mosquitto.org.
client:loop_forever()

Node MQtt Client:

var mqtt = require('mqtt');
var options = {
  port: 1883,
  host: 'mqtt://localhost',
  keepalive: 60,
  reconnectPeriod: 1000,
  protocolId: 'MQIsdp',
  protocolVersion: 3,
  clean: true,
  encoding: 'utf8'
};
var Client = mqtt.connect('mqtt://localhost', options);

Client.on('connect',  () => {
  console.log("Client Connected");
  //Client.subscribe("_c_5b8b215c855b7569eb94f0ed_d_5b92665fce3f081648ee86c9_k");
  Client.subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k");
});

Client.on('message', (p, a) => {
  console.log(p, a);
});

Mosca Broker.js

var mosca = require('mosca');
var crypto = require('crypto');
var fs = require('fs');

var backend = {
    type: "kafka",
    json: false,
    connectionString: "localhost:2181",
    clientId: "mosca",
    groupId: "mosca",
    defaultEncoding: "utf8",
  };

var moscaSettings = {
    interfaces: [
        { type: "mqtt", port: 1883 },
    ],
    id: "mosca",
    stats: false,
    publishNewClient: false,
    publishClientDisconnect: false,
    publishSubscriptions: false,
    backend: backend,
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
function setup() {
    console.log('Mosca server is up and running.');
}

server.on("error", function (err) {
    console.log(err);
});
server.on('clientConnected', function (client) {
    console.log('Client Connected \t:= ', client.id);
});
server.on('published', function (packet, client) {
  packet.payload = packet.payload.toString('utf8');
  console.log("Published :=", packet);
});
server.on('subscribed', function (topic, client) {
    console.log("Subscribed :=", topic, client.packet);
});
server.on('unsubscribed', function (topic, client) {
    console.log('unsubscribed := ', topic);
});
server.on('clientDisconnecting', function (client) {
    console.log('clientDisconnecting := ', client.id);
});
server.on('clientDisconnected', function (client) {
    console.log('Client Disconnected     := ', client.id);
});

Any ideas on how to solve this?

0xmtn avatar Sep 08 '18 15:09 0xmtn

Hi @mcollina, did you have any chance to look at this?

0xmtn avatar Sep 10 '18 06:09 0xmtn

@0xmtn did you have any luck with this? Having the same experience. Not sure if it's a Kafka setting or Mosca though. @mcollina any thoughts?

user1m avatar Feb 28 '19 22:02 user1m

@0xmtn @User1m meet the similar problem, could you give me some suggestion as to repeat the problem? I want to go deep into why this would happen. Thx!

grath10 avatar Mar 22 '19 01:03 grath10

@grath10 I ended up giving up on this lib but before that, I found a fork that was more actively maintained. if you want to continue w/ this lib then use the following instead: https://github.com/ConduitVC/ascoltatori https://github.com/ConduitVC/mosca hope that helps.

If it's of any help I talk about what I ended up going with here: https://github.com/User1m/kafka-pub-sub-investigation

user1m avatar Mar 22 '19 03:03 user1m

@User1m Thx again for sharing your practice with me. By the way, could you provide the substitute one for me to refer to? Any suggestion is appreciated!

grath10 avatar Mar 25 '19 00:03 grath10

@grath10 I went with just kafka-node

user1m avatar Mar 25 '19 03:03 user1m