ruby-mqtt
ruby-mqtt copied to clipboard
Timeouts in client.get
MQTT::Client#get
currently has no timeout applied to it. This is very inconvenient in certain situations. From what I see in the code, even network exceptions won't
cause it to unblock.
It's true that Ruby's Queue
implementation is very limited and arguably not a great
example of engineering but libraries such as Bunny still provide
timeout functionality for similar operations (using Timeout
, which surely has it's own
issues).
I'd recommend this client to support optional timeouts, too.
See a little bit of discussion about this in #17
I was actually heading in the direction of never timing out and performing a re-connect to the server if the connection drops.
It is perfectly valid is sit waiting for a message for a very long time - in fact it is very common. So there should be no timeout on #get by default. (although there should be a timeout if no pings have been sent/received.
In my MQTT HTTP Bridge, I do timeout after waiting for a #get: https://github.com/njh/mqtt-http-bridge/blob/master/mqtt-http-bridge.rb
I'm not suggesting to have a timeout by default but there should be a way to provide one.
Ah, ok.
So this:
begin
timeout(10) do
topic,message = client.get('topic')
# do stuff
end
rescue Timeout::Error
# deal with timeout
end
Would become something like this:
topic,message = client.get('topic', 10)
unless topic.nil?
# do stuff
end
Yes. Depending on Client#get
implementation there may be other options but your understanding is correct.
I'm developing a gem that adds some functionalities to this (https://github.com/antico5/mosca/). It supports timeouts among a few other stuff, check it out!
Out of curiosity @antico5, why are you wrapping this library as opposed to improving it? Are you just shooting for a different API?
@njh would you still be interested in having this timeout option added? I have a use case like this:
class MqttStateInjest
include SidekiqDaemon::Worker
def perform(farm_id)
client = Farm.find(farm_id).mqtt_client
client.subscribe('#')
client.get do |topic, payload|
# useful work
daemon_liveness_lock.renew
end
end
where that object takes out a lock to indicate it is the process in the system that is responsible for doing that useful work with MQTT messages. If that process dies however, it needs to be restarted, so the lock has an expiry. If the process dies, the lock expires, and the supervisor can then restart it. The issue is choosing a duration for the lock expiry. If it's too short, and #get could block longer than it, letting the lock expire even though the process is alive (and just blocked), and rendering the lock not much of a lock. Or if it's too long, when the process dies it affects the user experience if it takes too long for it to be restarted. So, it'd be great to be able to timeout that #get call so that there's a fixed upper bound for how long it will block for, and I can make sure I always renew the daemon lock more frequently than it might expire.
Happy to try to port something from mosca or write my own implementation if you would accept it!
Ah, ok.
So this:
begin timeout(10) do topic,message = client.get('topic') # do stuff end rescue Timeout::Error # deal with timeout end
Would become something like this:
topic,message = client.get('topic', 10) unless topic.nil? # do stuff end
Is there any news of this idea @njh ? 'would love so much to be able to detect connection drop, Keep Alive mechanism or QoS 1 in ruby-mqtt!
Still interested but still not made time to work on it...
Thanks for your work, it's already useful for those that don't need to operate on instable networks!
Just adding another use case for timeout (or polling get or nonblocking get)... I'm using ruby to simulate embedded devices for load testing and it matches our sensor better to have one connection used for both publishing and subscribing.