luamqtt icon indicating copy to clipboard operation
luamqtt copied to clipboard

when called mqtt.ioloop is stuck,finally trace at sync_recv_func(...) stucked in function client_mt:_apply_network_timeout()

Open pxyove opened this issue 3 years ago • 20 comments

gcc——toolchain-mipsel_24kec+dsp_gcc-4.8-linaro_uClibc-0.9.33.2 lua:5.1.5 skynet:1.5.0 mqtt:3.4.2

My code is as follows:

local io = require("ioloop")
local ioloop = require("mqtt.ioloop")

local mq_loop = ioloop.create {
        sleep = 1,
        timeout = 1,
        sleep_function = skynet.sleep
    }

local client = mqtt.client(conn_params)
    client:on{
        connect = function(connack)
            assert(not mqtt_client)

            if not (connack and connack.rc == 0) then
                logger:error("connection to mqtt fail: %s %s", connack and connack:reason_string() or "", connack)
                return
            end

            logger:info("MQTT client connected server success!!!") -- successful connection
            state_table.connect_ok = true
            local response_count = 0
            for _, item in ipairs(topics) do
                local module, topic = item.module, item.topic
                client:subscribe{
                    topic = topic,
                    qos = 0,
                    callback = function(suback, args)
                        response_count = response_count + 1
                        if suback.rc[1] ~= 0 then
                            client:disconnect(0)
                            logger:error("subscribe [%s %s] fail %s", module, topic, suback.rc[1])
                            return
                        end

                        logger:info("subscribe from cloud [%s, %s] ok", module, topic)
                        if response_count >= #topics then
                            mqtt_client = client
                            logger:info("All topic subscribes OK, MQTT client is ready!!!")
                            --Not support, upload software version with 5mins timer.
                            --[[
                            local mqtt_dispatcher = skynet.queryservice("mqtt_dispatcher")
                            skynet.send(mqtt_dispatcher, "lua", "post")
                            ]]
                        end
                    end
                }
            end
        end,
        message = function(msg)
            local err = client:acknowledge(msg)
            if not err then
                logger:error("received a error msg!")
                return
            end
            table.insert(work_queue, {
                msg.topic,
                msg.payload
            })

            logger:info("Recv from cloud, topic: %s, payload: %s", msg.topic, msg.payload)
            skynet.wakeup(work_loop_co)
            skynet.yield()
        end,
        error = function(err, args, connack)
            logger:error("mqtt client err: %s, args: %s, connack: %s", err, args, connack)
            if not connack then
                logger:error("MQTT client error maybe unable to connect to network")
                return
            end

            if connack.rc == 4 then
                state_table.connect_ok = false
                client.args.reconnect = client.args.reconnect + 5
            end
        end,
        close = function()
            state_table.connect_ok = false
            mqtt_client = nil -- set to nil mqtt_client = nil -- set to nil
            client = nil
            logger:error("mqtt client is close, set to not ready!!!")
        end
    }

    mq_loop:add(client)
    client:start_connecting()
    while true do
       if not client then
            break
        end
        mq_loop:iteration()   --------when run here is stuck
        skynet.yield()
    end

The mqTT_client is ready when the client is connected to the broker and three topics are subscribed. The mqTT_client is then stuck at mq_loop: Iteration ()

What is the cause of the jam? Appreciate receiving

pxyove avatar Jan 17 '22 07:01 pxyove

local function ioloop_recv(self)
	return coroutine_resume(self.connection.coro)
end

-- Perform one input/output iteration, called by ioloop
function client_mt:_ioloop_iteration()
	-- working according state
	local loop = self.ioloop
	local args = self.args

	local conn = self.connection
	if conn then
		-- network connection opened
		-- perform packet receiving using ioloop receive function
		local ok, err
		if loop then
			ok, err = self:_io_iteration(ioloop_recv) -----------enter this branch
		else
			ok, err = self:_sync_iteration()
		end
........
end

function client_mt:_io_iteration(recv)
	local conn = self.connection

	-- first - try to receive packet
	local ok, packet, err = recv(self) -------finally stuck in here
	-- print("received packet", ok, packet, err)
...
end

pxyove avatar Jan 17 '22 07:01 pxyove

Please write in English in this repo

xHasKx avatar Jan 17 '22 08:01 xHasKx

What connector are you using? Without it, the blocking luasocket functions will be used

xHasKx avatar Jan 17 '22 10:01 xHasKx

What connector are you using? Without it, the blocking luasocket functions will be used

I use default connector,but I have two devices, same code, one communicating normally and the other having this problem,

pxyove avatar Jan 17 '22 11:01 pxyove

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server.

As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector.

I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

xHasKx avatar Jan 17 '22 14:01 xHasKx

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server.

As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector.

I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

Thank you very much. I will try other connectors first to see if I can solve this problem

pxyove avatar Jan 18 '22 03:01 pxyove

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server. As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector. I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

Thank you very much. I will try other connectors first to see if I can solve this problem

May I ask you? I tracked the code further and found that it ended up here

function client_mt:_apply_network_timeout()
....

    local sync_recv_func = conn.recv_func
    conn.recv_func = function(...)
        while true do
	          local data, err = sync_recv_func(...) 
	          if not data and (err == "timeout" or err == "wantread") then)
		          loop.timeouted = true
		          coroutine_yield(err)
		          skynetco.yield(err)
	          else
		          return data, err
	          end
          end
    end

....
self.connection.recv_func = connector.receive() ------finally, block in here, is this correct? 
I used default connector, so block in function luasocket.receive()
....

sorry, I didn't get the format right

pxyove avatar Jan 19 '22 03:01 pxyove

Yes, the luasocket.receive() call blocks the OS thread.

sorry, I didn't get the format right

I mean please enclose your code samples into `-char repeated 3 times to make block of code

xHasKx avatar Jan 19 '22 05:01 xHasKx

But I set the ioloop timeout to one second and it didn't seem to work, so is block,why? Two devices same code, one working, one not working. I'll keep track. Thank you very much.

pxyove avatar Jan 19 '22 06:01 pxyove

  1. Try to uncomment print() debug code in luasocket.send() and luasocket.receive() functions to track MQTT traffic, maybe you can find a difference for the two devices
  2. Try to set timeout to 0.1

xHasKx avatar Jan 19 '22 07:01 xHasKx

I tried to do that, but two devices luasocket.send one and luasocket.receive three data, they are same. Thank you very much. I'll keep track.

pxyove avatar Jan 19 '22 07:01 pxyove

  1. Try to uncomment print() debug code in luasocket.send() and luasocket.receive() functions to track MQTT traffic, maybe you can find a difference for the two devices
  2. Try to set timeout to 0.1

Do you know the luasocket.settimeout of timeout where take effect and check. Because I'm suspicious of timeout no work

function luasocket.settimeout(conn, timeout)
	conn.timeout = timeout
	conn.sock:settimeout(timeout, "t")
end

pxyove avatar Jan 19 '22 07:01 pxyove

I mean

local mq_loop = ioloop.create {
        timeout = 0.1,
...
}

xHasKx avatar Jan 19 '22 07:01 xHasKx

I mean

local mq_loop = ioloop.create {
        timeout = 0.1,
...
}

I tried to do that, but two devices luasocket.send one and luasocket.receive three data, they are same.

pxyove avatar Jan 19 '22 08:01 pxyove

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

pxyove avatar Jan 19 '22 09:01 pxyove

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

Sorry, I cannot understand you

Did you try to uncomment network packet print()-traces as I advised?

xHasKx avatar Jan 19 '22 12:01 xHasKx

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

Sorry, I cannot understand you

Did you try to uncomment network packet print()-traces as I advised?

Yes, I've tried that,two devices luasocket.send and luasocket.receive data is same

I mean, ioloop itself is non-blocking,but now it is blocking, maybe timeout no effective, Do you know where it effect?

local mq_loop = ioloop.create {
        timeout = 0.1,
}

pxyove avatar Jan 19 '22 12:01 pxyove

Timeout is applied to the luasocket's socket at https://github.com/xHasKx/luamqtt/blob/master/mqtt/luasocket.lua#L46 Can you please describe in more detail, what do you mean by "but I have two devices, same code, one communicating normally and the other having this problem". Does your code is blocked (i.e. code other than luamqtt does not run at all) or do you get disconnected from the broker on the second device?

xHasKx avatar Jan 20 '22 08:01 xHasKx

I'm sorry. I've been on vacation. After I traced Luasocket deeply, I found that timeout in Luasocket did not take effect. I will reply to you when I solve this problem. Thank you very much.

pxyove avatar Jan 24 '22 09:01 pxyove

Thank you for everyone, I have found out the cause of the problem(system-called connect/recv ret=-1,errno=0), It's caused by compile options error for LINUXTHREADS_OLD=y in uClibc-0.9.33.2.so .config, should select UCLIBC_HAS_THREADS_NATIVE=y.

for https://github.com/cloudwu/skynet/issues/1522 for https://github.com/lunarmodules/luasocket/issues/349 for https://github.com/xHasKx/luamqtt/issues/38

pxyove avatar Apr 14 '22 07:04 pxyove