xled
xled copied to clipboard
ControlInterface wrapper to help with timeouts
I use set_rt_frame_socket
to send individual frames to my light strings. These are shown for 60 seconds before the string reverts to its last stored movie. Because the xled.control.ControlInterface
does not set a short timeout when communicating with the lights, sometimes I see many-minute hangs during which communication is lost. These failed connections are intermittent, so in the absence of an xled
timeout argument I’ve found that this wrapper class works well:
class TimeoutControlInterface:
interface = None
def __init__(self, *args, **kwargs):
"""Initialize xled.control.ControlInterface with same arguments"""
self.interface = xled.control.ControlInterface(*args, **kwargs)
def __getattr__(self, name):
"""Retrieve versions of self.interface methods that use a threaded timeout"""
original_method, responses = getattr(self.interface, name), []
def wrapped_method(*args, **kwargs):
"""Call the original method and stash its response"""
responses.append(original_method(*args, **kwargs))
def threaded_method(*args, **kwargs):
"""Call the wrapped method inside a thread and return its response"""
thread = threading.Thread(target=wrapped_method, args=args, kwargs=kwargs)
thread.start()
thread.join(timeout=5.0)
if thread.is_alive():
raise Exception("Thread still alive")
return responses[0]
return threaded_method
It’s used just like xled.control.ControlInterface
but raises an exception when a method call requires more than five seconds to complete. Hope it’s useful to someome!